1#![doc(
129 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
130 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
131)]
132#![cfg_attr(docsrs, feature(doc_auto_cfg))]
133#![warn(missing_docs)]
134use std::cmp::Ordering;
135use std::hash::{Hash, Hasher};
136use std::sync::Arc;
137
138use arrow_array::cast::*;
139use arrow_array::types::ArrowDictionaryKeyType;
140use arrow_array::*;
141use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
142use arrow_data::ArrayDataBuilder;
143use arrow_schema::*;
144use variable::{decode_binary_view, decode_string_view};
145
146use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
147use crate::variable::{decode_binary, decode_string};
148use arrow_array::types::{Int16Type, Int32Type, Int64Type};
149
150mod fixed;
151mod list;
152mod run;
153mod variable;
154
155#[derive(Debug)]
372pub struct RowConverter {
373 fields: Arc<[SortField]>,
374 codecs: Vec<Codec>,
376}
377
378#[derive(Debug)]
379enum Codec {
380 Stateless,
382 Dictionary(RowConverter, OwnedRow),
385 Struct(RowConverter, OwnedRow),
388 List(RowConverter),
390 RunEndEncoded(RowConverter),
392}
393
394impl Codec {
395 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
396 match &sort_field.data_type {
397 DataType::Dictionary(_, values) => {
398 let sort_field =
399 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
400
401 let converter = RowConverter::new(vec![sort_field])?;
402 let null_array = new_null_array(values.as_ref(), 1);
403 let nulls = converter.convert_columns(&[null_array])?;
404
405 let owned = OwnedRow {
406 data: nulls.buffer.into(),
407 config: nulls.config,
408 };
409 Ok(Self::Dictionary(converter, owned))
410 }
411 DataType::RunEndEncoded(_, values) => {
412 let options = SortOptions {
414 descending: false,
415 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
416 };
417
418 let field = SortField::new_with_options(values.data_type().clone(), options);
419 let converter = RowConverter::new(vec![field])?;
420 Ok(Self::RunEndEncoded(converter))
421 }
422 d if !d.is_nested() => Ok(Self::Stateless),
423 DataType::List(f) | DataType::LargeList(f) => {
424 let options = SortOptions {
428 descending: false,
429 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
430 };
431
432 let field = SortField::new_with_options(f.data_type().clone(), options);
433 let converter = RowConverter::new(vec![field])?;
434 Ok(Self::List(converter))
435 }
436 DataType::Struct(f) => {
437 let sort_fields = f
438 .iter()
439 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
440 .collect();
441
442 let converter = RowConverter::new(sort_fields)?;
443 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
444
445 let nulls = converter.convert_columns(&nulls)?;
446 let owned = OwnedRow {
447 data: nulls.buffer.into(),
448 config: nulls.config,
449 };
450
451 Ok(Self::Struct(converter, owned))
452 }
453 _ => Err(ArrowError::NotYetImplemented(format!(
454 "not yet implemented: {:?}",
455 sort_field.data_type
456 ))),
457 }
458 }
459
460 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
461 match self {
462 Codec::Stateless => Ok(Encoder::Stateless),
463 Codec::Dictionary(converter, nulls) => {
464 let values = array.as_any_dictionary().values().clone();
465 let rows = converter.convert_columns(&[values])?;
466 Ok(Encoder::Dictionary(rows, nulls.row()))
467 }
468 Codec::Struct(converter, null) => {
469 let v = as_struct_array(array);
470 let rows = converter.convert_columns(v.columns())?;
471 Ok(Encoder::Struct(rows, null.row()))
472 }
473 Codec::List(converter) => {
474 let values = match array.data_type() {
475 DataType::List(_) => as_list_array(array).values(),
476 DataType::LargeList(_) => as_large_list_array(array).values(),
477 _ => unreachable!(),
478 };
479 let rows = converter.convert_columns(&[values.clone()])?;
480 Ok(Encoder::List(rows))
481 }
482 Codec::RunEndEncoded(converter) => {
483 let values = match array.data_type() {
484 DataType::RunEndEncoded(r, _) => match r.data_type() {
485 DataType::Int16 => array.as_run::<Int16Type>().values(),
486 DataType::Int32 => array.as_run::<Int32Type>().values(),
487 DataType::Int64 => array.as_run::<Int64Type>().values(),
488 _ => unreachable!("Unsupported run end index type: {r:?}"),
489 },
490 _ => unreachable!(),
491 };
492 let rows = converter.convert_columns(&[values.clone()])?;
493 Ok(Encoder::RunEndEncoded(rows))
494 }
495 }
496 }
497
498 fn size(&self) -> usize {
499 match self {
500 Codec::Stateless => 0,
501 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
502 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
503 Codec::List(converter) => converter.size(),
504 Codec::RunEndEncoded(converter) => converter.size(),
505 }
506 }
507}
508
509#[derive(Debug)]
510enum Encoder<'a> {
511 Stateless,
513 Dictionary(Rows, Row<'a>),
515 Struct(Rows, Row<'a>),
521 List(Rows),
523 RunEndEncoded(Rows),
525}
526
527#[derive(Debug, Clone, PartialEq, Eq)]
529pub struct SortField {
530 options: SortOptions,
532 data_type: DataType,
534}
535
536impl SortField {
537 pub fn new(data_type: DataType) -> Self {
539 Self::new_with_options(data_type, Default::default())
540 }
541
542 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
544 Self { options, data_type }
545 }
546
547 pub fn size(&self) -> usize {
551 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
552 }
553}
554
555impl RowConverter {
556 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
558 if !Self::supports_fields(&fields) {
559 return Err(ArrowError::NotYetImplemented(format!(
560 "Row format support not yet implemented for: {fields:?}"
561 )));
562 }
563
564 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
565 Ok(Self {
566 fields: fields.into(),
567 codecs,
568 })
569 }
570
571 pub fn supports_fields(fields: &[SortField]) -> bool {
573 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
574 }
575
576 fn supports_datatype(d: &DataType) -> bool {
577 match d {
578 _ if !d.is_nested() => true,
579 DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
580 Self::supports_datatype(f.data_type())
581 }
582 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
583 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
584 _ => false,
585 }
586 }
587
588 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
596 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
597 let mut rows = self.empty_rows(num_rows, 0);
598 self.append(&mut rows, columns)?;
599 Ok(rows)
600 }
601
602 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
633 assert!(
634 Arc::ptr_eq(&rows.config.fields, &self.fields),
635 "rows were not produced by this RowConverter"
636 );
637
638 if columns.len() != self.fields.len() {
639 return Err(ArrowError::InvalidArgumentError(format!(
640 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
641 self.fields.len(),
642 columns.len()
643 )));
644 }
645
646 let encoders = columns
647 .iter()
648 .zip(&self.codecs)
649 .zip(self.fields.iter())
650 .map(|((column, codec), field)| {
651 if !column.data_type().equals_datatype(&field.data_type) {
652 return Err(ArrowError::InvalidArgumentError(format!(
653 "RowConverter column schema mismatch, expected {} got {}",
654 field.data_type,
655 column.data_type()
656 )));
657 }
658 codec.encoder(column.as_ref())
659 })
660 .collect::<Result<Vec<_>, _>>()?;
661
662 let write_offset = rows.num_rows();
663 let lengths = row_lengths(columns, &encoders);
664 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
665 rows.buffer.resize(total, 0);
666
667 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
668 encode_column(
670 &mut rows.buffer,
671 &mut rows.offsets[write_offset..],
672 column.as_ref(),
673 field.options,
674 &encoder,
675 )
676 }
677
678 if cfg!(debug_assertions) {
679 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
680 rows.offsets
681 .windows(2)
682 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
683 }
684
685 Ok(())
686 }
687
688 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
694 where
695 I: IntoIterator<Item = Row<'a>>,
696 {
697 let mut validate_utf8 = false;
698 let mut rows: Vec<_> = rows
699 .into_iter()
700 .map(|row| {
701 assert!(
702 Arc::ptr_eq(&row.config.fields, &self.fields),
703 "rows were not produced by this RowConverter"
704 );
705 validate_utf8 |= row.config.validate_utf8;
706 row.data
707 })
708 .collect();
709
710 unsafe { self.convert_raw(&mut rows, validate_utf8) }
714 }
715
716 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
745 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
746 offsets.push(0);
747
748 Rows {
749 offsets,
750 buffer: Vec::with_capacity(data_capacity),
751 config: RowConfig {
752 fields: self.fields.clone(),
753 validate_utf8: false,
754 },
755 }
756 }
757
758 pub fn from_binary(&self, array: BinaryArray) -> Rows {
785 assert_eq!(
786 array.null_count(),
787 0,
788 "can't construct Rows instance from array with nulls"
789 );
790 Rows {
791 buffer: array.values().to_vec(),
792 offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
793 config: RowConfig {
794 fields: Arc::clone(&self.fields),
795 validate_utf8: true,
796 },
797 }
798 }
799
800 unsafe fn convert_raw(
806 &self,
807 rows: &mut [&[u8]],
808 validate_utf8: bool,
809 ) -> Result<Vec<ArrayRef>, ArrowError> {
810 self.fields
811 .iter()
812 .zip(&self.codecs)
813 .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
814 .collect()
815 }
816
817 pub fn parser(&self) -> RowParser {
819 RowParser::new(Arc::clone(&self.fields))
820 }
821
822 pub fn size(&self) -> usize {
826 std::mem::size_of::<Self>()
827 + self.fields.iter().map(|x| x.size()).sum::<usize>()
828 + self.codecs.capacity() * std::mem::size_of::<Codec>()
829 + self.codecs.iter().map(Codec::size).sum::<usize>()
830 }
831}
832
833#[derive(Debug)]
835pub struct RowParser {
836 config: RowConfig,
837}
838
839impl RowParser {
840 fn new(fields: Arc<[SortField]>) -> Self {
841 Self {
842 config: RowConfig {
843 fields,
844 validate_utf8: true,
845 },
846 }
847 }
848
849 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
854 Row {
855 data: bytes,
856 config: &self.config,
857 }
858 }
859}
860
861#[derive(Debug, Clone)]
863struct RowConfig {
864 fields: Arc<[SortField]>,
866 validate_utf8: bool,
868}
869
870#[derive(Debug)]
874pub struct Rows {
875 buffer: Vec<u8>,
877 offsets: Vec<usize>,
879 config: RowConfig,
881}
882
883impl Rows {
884 pub fn push(&mut self, row: Row<'_>) {
886 assert!(
887 Arc::ptr_eq(&row.config.fields, &self.config.fields),
888 "row was not produced by this RowConverter"
889 );
890 self.config.validate_utf8 |= row.config.validate_utf8;
891 self.buffer.extend_from_slice(row.data);
892 self.offsets.push(self.buffer.len())
893 }
894
895 pub fn row(&self, row: usize) -> Row<'_> {
897 assert!(row + 1 < self.offsets.len());
898 unsafe { self.row_unchecked(row) }
899 }
900
901 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
906 let end = unsafe { self.offsets.get_unchecked(index + 1) };
907 let start = unsafe { self.offsets.get_unchecked(index) };
908 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
909 Row {
910 data,
911 config: &self.config,
912 }
913 }
914
915 pub fn clear(&mut self) {
917 self.offsets.truncate(1);
918 self.buffer.clear();
919 }
920
921 pub fn num_rows(&self) -> usize {
923 self.offsets.len() - 1
924 }
925
926 pub fn iter(&self) -> RowsIter<'_> {
928 self.into_iter()
929 }
930
931 pub fn size(&self) -> usize {
935 std::mem::size_of::<Self>()
937 + self.buffer.len()
938 + self.offsets.len() * std::mem::size_of::<usize>()
939 }
940
941 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
971 if self.buffer.len() > i32::MAX as usize {
972 return Err(ArrowError::InvalidArgumentError(format!(
973 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
974 self.buffer.len()
975 )));
976 }
977 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
979 let array = unsafe {
981 BinaryArray::new_unchecked(
982 OffsetBuffer::new_unchecked(offsets_scalar),
983 Buffer::from_vec(self.buffer),
984 None,
985 )
986 };
987 Ok(array)
988 }
989}
990
991impl<'a> IntoIterator for &'a Rows {
992 type Item = Row<'a>;
993 type IntoIter = RowsIter<'a>;
994
995 fn into_iter(self) -> Self::IntoIter {
996 RowsIter {
997 rows: self,
998 start: 0,
999 end: self.num_rows(),
1000 }
1001 }
1002}
1003
1004#[derive(Debug)]
1006pub struct RowsIter<'a> {
1007 rows: &'a Rows,
1008 start: usize,
1009 end: usize,
1010}
1011
1012impl<'a> Iterator for RowsIter<'a> {
1013 type Item = Row<'a>;
1014
1015 fn next(&mut self) -> Option<Self::Item> {
1016 if self.end == self.start {
1017 return None;
1018 }
1019
1020 let row = unsafe { self.rows.row_unchecked(self.start) };
1022 self.start += 1;
1023 Some(row)
1024 }
1025
1026 fn size_hint(&self) -> (usize, Option<usize>) {
1027 let len = self.len();
1028 (len, Some(len))
1029 }
1030}
1031
1032impl ExactSizeIterator for RowsIter<'_> {
1033 fn len(&self) -> usize {
1034 self.end - self.start
1035 }
1036}
1037
1038impl DoubleEndedIterator for RowsIter<'_> {
1039 fn next_back(&mut self) -> Option<Self::Item> {
1040 if self.end == self.start {
1041 return None;
1042 }
1043 let row = unsafe { self.rows.row_unchecked(self.end) };
1045 self.end -= 1;
1046 Some(row)
1047 }
1048}
1049
1050#[derive(Debug, Copy, Clone)]
1059pub struct Row<'a> {
1060 data: &'a [u8],
1061 config: &'a RowConfig,
1062}
1063
1064impl<'a> Row<'a> {
1065 pub fn owned(&self) -> OwnedRow {
1067 OwnedRow {
1068 data: self.data.into(),
1069 config: self.config.clone(),
1070 }
1071 }
1072
1073 pub fn data(&self) -> &'a [u8] {
1075 self.data
1076 }
1077}
1078
1079impl PartialEq for Row<'_> {
1082 #[inline]
1083 fn eq(&self, other: &Self) -> bool {
1084 self.data.eq(other.data)
1085 }
1086}
1087
1088impl Eq for Row<'_> {}
1089
1090impl PartialOrd for Row<'_> {
1091 #[inline]
1092 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1093 Some(self.cmp(other))
1094 }
1095}
1096
1097impl Ord for Row<'_> {
1098 #[inline]
1099 fn cmp(&self, other: &Self) -> Ordering {
1100 self.data.cmp(other.data)
1101 }
1102}
1103
1104impl Hash for Row<'_> {
1105 #[inline]
1106 fn hash<H: Hasher>(&self, state: &mut H) {
1107 self.data.hash(state)
1108 }
1109}
1110
1111impl AsRef<[u8]> for Row<'_> {
1112 #[inline]
1113 fn as_ref(&self) -> &[u8] {
1114 self.data
1115 }
1116}
1117
1118#[derive(Debug, Clone)]
1122pub struct OwnedRow {
1123 data: Box<[u8]>,
1124 config: RowConfig,
1125}
1126
1127impl OwnedRow {
1128 pub fn row(&self) -> Row<'_> {
1132 Row {
1133 data: &self.data,
1134 config: &self.config,
1135 }
1136 }
1137}
1138
1139impl PartialEq for OwnedRow {
1142 #[inline]
1143 fn eq(&self, other: &Self) -> bool {
1144 self.row().eq(&other.row())
1145 }
1146}
1147
1148impl Eq for OwnedRow {}
1149
1150impl PartialOrd for OwnedRow {
1151 #[inline]
1152 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1153 Some(self.cmp(other))
1154 }
1155}
1156
1157impl Ord for OwnedRow {
1158 #[inline]
1159 fn cmp(&self, other: &Self) -> Ordering {
1160 self.row().cmp(&other.row())
1161 }
1162}
1163
1164impl Hash for OwnedRow {
1165 #[inline]
1166 fn hash<H: Hasher>(&self, state: &mut H) {
1167 self.row().hash(state)
1168 }
1169}
1170
1171impl AsRef<[u8]> for OwnedRow {
1172 #[inline]
1173 fn as_ref(&self) -> &[u8] {
1174 &self.data
1175 }
1176}
1177
1178#[inline]
1180fn null_sentinel(options: SortOptions) -> u8 {
1181 match options.nulls_first {
1182 true => 0,
1183 false => 0xFF,
1184 }
1185}
1186
1187enum LengthTracker {
1189 Fixed { length: usize, num_rows: usize },
1191 Variable {
1193 fixed_length: usize,
1194 lengths: Vec<usize>,
1195 },
1196}
1197
1198impl LengthTracker {
1199 fn new(num_rows: usize) -> Self {
1200 Self::Fixed {
1201 length: 0,
1202 num_rows,
1203 }
1204 }
1205
1206 fn push_fixed(&mut self, new_length: usize) {
1208 match self {
1209 LengthTracker::Fixed { length, .. } => *length += new_length,
1210 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1211 }
1212 }
1213
1214 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1216 match self {
1217 LengthTracker::Fixed { length, .. } => {
1218 *self = LengthTracker::Variable {
1219 fixed_length: *length,
1220 lengths: new_lengths.collect(),
1221 }
1222 }
1223 LengthTracker::Variable { lengths, .. } => {
1224 assert_eq!(lengths.len(), new_lengths.len());
1225 lengths
1226 .iter_mut()
1227 .zip(new_lengths)
1228 .for_each(|(length, new_length)| *length += new_length);
1229 }
1230 }
1231 }
1232
1233 fn materialized(&mut self) -> &mut [usize] {
1235 if let LengthTracker::Fixed { length, num_rows } = *self {
1236 *self = LengthTracker::Variable {
1237 fixed_length: length,
1238 lengths: vec![0; num_rows],
1239 };
1240 }
1241
1242 match self {
1243 LengthTracker::Variable { lengths, .. } => lengths,
1244 LengthTracker::Fixed { .. } => unreachable!(),
1245 }
1246 }
1247
1248 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1266 match self {
1267 LengthTracker::Fixed { length, num_rows } => {
1268 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1269
1270 initial_offset + num_rows * length
1271 }
1272 LengthTracker::Variable {
1273 fixed_length,
1274 lengths,
1275 } => {
1276 let mut acc = initial_offset;
1277
1278 offsets.extend(lengths.iter().map(|length| {
1279 let current = acc;
1280 acc += length + fixed_length;
1281 current
1282 }));
1283
1284 acc
1285 }
1286 }
1287 }
1288}
1289
1290fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1292 use fixed::FixedLengthEncoding;
1293
1294 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1295 let mut tracker = LengthTracker::new(num_rows);
1296
1297 for (array, encoder) in cols.iter().zip(encoders) {
1298 match encoder {
1299 Encoder::Stateless => {
1300 downcast_primitive_array! {
1301 array => tracker.push_fixed(fixed::encoded_len(array)),
1302 DataType::Null => {},
1303 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1304 DataType::Binary => tracker.push_variable(
1305 as_generic_binary_array::<i32>(array)
1306 .iter()
1307 .map(|slice| variable::encoded_len(slice))
1308 ),
1309 DataType::LargeBinary => tracker.push_variable(
1310 as_generic_binary_array::<i64>(array)
1311 .iter()
1312 .map(|slice| variable::encoded_len(slice))
1313 ),
1314 DataType::BinaryView => tracker.push_variable(
1315 array.as_binary_view()
1316 .iter()
1317 .map(|slice| variable::encoded_len(slice))
1318 ),
1319 DataType::Utf8 => tracker.push_variable(
1320 array.as_string::<i32>()
1321 .iter()
1322 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1323 ),
1324 DataType::LargeUtf8 => tracker.push_variable(
1325 array.as_string::<i64>()
1326 .iter()
1327 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1328 ),
1329 DataType::Utf8View => tracker.push_variable(
1330 array.as_string_view()
1331 .iter()
1332 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1333 ),
1334 DataType::FixedSizeBinary(len) => {
1335 let len = len.to_usize().unwrap();
1336 tracker.push_fixed(1 + len)
1337 }
1338 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1339 }
1340 }
1341 Encoder::Dictionary(values, null) => {
1342 downcast_dictionary_array! {
1343 array => {
1344 tracker.push_variable(
1345 array.keys().iter().map(|v| match v {
1346 Some(k) => values.row(k.as_usize()).data.len(),
1347 None => null.data.len(),
1348 })
1349 )
1350 }
1351 _ => unreachable!(),
1352 }
1353 }
1354 Encoder::Struct(rows, null) => {
1355 let array = as_struct_array(array);
1356 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1357 true => 1 + rows.row(idx).as_ref().len(),
1358 false => 1 + null.data.len(),
1359 }));
1360 }
1361 Encoder::List(rows) => match array.data_type() {
1362 DataType::List(_) => {
1363 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1364 }
1365 DataType::LargeList(_) => {
1366 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1367 }
1368 _ => unreachable!(),
1369 },
1370 Encoder::RunEndEncoded(rows) => match array.data_type() {
1371 DataType::RunEndEncoded(r, _) => match r.data_type() {
1372 DataType::Int16 => run::compute_lengths(
1373 tracker.materialized(),
1374 rows,
1375 array.as_run::<Int16Type>(),
1376 ),
1377 DataType::Int32 => run::compute_lengths(
1378 tracker.materialized(),
1379 rows,
1380 array.as_run::<Int32Type>(),
1381 ),
1382 DataType::Int64 => run::compute_lengths(
1383 tracker.materialized(),
1384 rows,
1385 array.as_run::<Int64Type>(),
1386 ),
1387 _ => unreachable!("Unsupported run end index type: {r:?}"),
1388 },
1389 _ => unreachable!(),
1390 },
1391 }
1392 }
1393
1394 tracker
1395}
1396
1397fn encode_column(
1399 data: &mut [u8],
1400 offsets: &mut [usize],
1401 column: &dyn Array,
1402 opts: SortOptions,
1403 encoder: &Encoder<'_>,
1404) {
1405 match encoder {
1406 Encoder::Stateless => {
1407 downcast_primitive_array! {
1408 column => {
1409 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1410 fixed::encode(data, offsets, column.values(), nulls, opts)
1411 } else {
1412 fixed::encode_not_null(data, offsets, column.values(), opts)
1413 }
1414 }
1415 DataType::Null => {}
1416 DataType::Boolean => {
1417 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1418 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1419 } else {
1420 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1421 }
1422 }
1423 DataType::Binary => {
1424 variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1425 }
1426 DataType::BinaryView => {
1427 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1428 }
1429 DataType::LargeBinary => {
1430 variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1431 }
1432 DataType::Utf8 => variable::encode(
1433 data, offsets,
1434 column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1435 opts,
1436 ),
1437 DataType::LargeUtf8 => variable::encode(
1438 data, offsets,
1439 column.as_string::<i64>()
1440 .iter()
1441 .map(|x| x.map(|x| x.as_bytes())),
1442 opts,
1443 ),
1444 DataType::Utf8View => variable::encode(
1445 data, offsets,
1446 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1447 opts,
1448 ),
1449 DataType::FixedSizeBinary(_) => {
1450 let array = column.as_any().downcast_ref().unwrap();
1451 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1452 }
1453 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1454 }
1455 }
1456 Encoder::Dictionary(values, nulls) => {
1457 downcast_dictionary_array! {
1458 column => encode_dictionary_values(data, offsets, column, values, nulls),
1459 _ => unreachable!()
1460 }
1461 }
1462 Encoder::Struct(rows, null) => {
1463 let array = as_struct_array(column);
1464 let null_sentinel = null_sentinel(opts);
1465 offsets
1466 .iter_mut()
1467 .skip(1)
1468 .enumerate()
1469 .for_each(|(idx, offset)| {
1470 let (row, sentinel) = match array.is_valid(idx) {
1471 true => (rows.row(idx), 0x01),
1472 false => (*null, null_sentinel),
1473 };
1474 let end_offset = *offset + 1 + row.as_ref().len();
1475 data[*offset] = sentinel;
1476 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1477 *offset = end_offset;
1478 })
1479 }
1480 Encoder::List(rows) => match column.data_type() {
1481 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1482 DataType::LargeList(_) => {
1483 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1484 }
1485 _ => unreachable!(),
1486 },
1487 Encoder::RunEndEncoded(rows) => match column.data_type() {
1488 DataType::RunEndEncoded(r, _) => match r.data_type() {
1489 DataType::Int16 => {
1490 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1491 }
1492 DataType::Int32 => {
1493 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1494 }
1495 DataType::Int64 => {
1496 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1497 }
1498 _ => unreachable!("Unsupported run end index type: {r:?}"),
1499 },
1500 _ => unreachable!(),
1501 },
1502 }
1503}
1504
1505pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1507 data: &mut [u8],
1508 offsets: &mut [usize],
1509 column: &DictionaryArray<K>,
1510 values: &Rows,
1511 null: &Row<'_>,
1512) {
1513 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1514 let row = match k {
1515 Some(k) => values.row(k.as_usize()).data,
1516 None => null.data,
1517 };
1518 let end_offset = *offset + row.len();
1519 data[*offset..end_offset].copy_from_slice(row);
1520 *offset = end_offset;
1521 }
1522}
1523
1524macro_rules! decode_primitive_helper {
1525 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1526 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1527 };
1528}
1529
1530unsafe fn decode_column(
1536 field: &SortField,
1537 rows: &mut [&[u8]],
1538 codec: &Codec,
1539 validate_utf8: bool,
1540) -> Result<ArrayRef, ArrowError> {
1541 let options = field.options;
1542
1543 let array: ArrayRef = match codec {
1544 Codec::Stateless => {
1545 let data_type = field.data_type.clone();
1546 downcast_primitive! {
1547 data_type => (decode_primitive_helper, rows, data_type, options),
1548 DataType::Null => Arc::new(NullArray::new(rows.len())),
1549 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1550 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1551 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1552 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1553 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1554 DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1555 DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1556 DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1557 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type)))
1558 }
1559 }
1560 Codec::Dictionary(converter, _) => {
1561 let cols = converter.convert_raw(rows, validate_utf8)?;
1562 cols.into_iter().next().unwrap()
1563 }
1564 Codec::Struct(converter, _) => {
1565 let (null_count, nulls) = fixed::decode_nulls(rows);
1566 rows.iter_mut().for_each(|row| *row = &row[1..]);
1567 let children = converter.convert_raw(rows, validate_utf8)?;
1568
1569 let child_data = children.iter().map(|c| c.to_data()).collect();
1570 let builder = ArrayDataBuilder::new(field.data_type.clone())
1571 .len(rows.len())
1572 .null_count(null_count)
1573 .null_bit_buffer(Some(nulls))
1574 .child_data(child_data);
1575
1576 Arc::new(StructArray::from(builder.build_unchecked()))
1577 }
1578 Codec::List(converter) => match &field.data_type {
1579 DataType::List(_) => {
1580 Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1581 }
1582 DataType::LargeList(_) => {
1583 Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1584 }
1585 _ => unreachable!(),
1586 },
1587 Codec::RunEndEncoded(converter) => match &field.data_type {
1588 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1589 DataType::Int16 => Arc::new(run::decode::<Int16Type>(
1590 converter,
1591 rows,
1592 field,
1593 validate_utf8,
1594 )?),
1595 DataType::Int32 => Arc::new(run::decode::<Int32Type>(
1596 converter,
1597 rows,
1598 field,
1599 validate_utf8,
1600 )?),
1601 DataType::Int64 => Arc::new(run::decode::<Int64Type>(
1602 converter,
1603 rows,
1604 field,
1605 validate_utf8,
1606 )?),
1607 _ => unreachable!(),
1608 },
1609 _ => unreachable!(),
1610 },
1611 };
1612 Ok(array)
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617 use rand::distr::uniform::SampleUniform;
1618 use rand::distr::{Distribution, StandardUniform};
1619 use rand::{rng, Rng};
1620
1621 use arrow_array::builder::*;
1622 use arrow_array::types::*;
1623 use arrow_array::*;
1624 use arrow_buffer::{i256, NullBuffer};
1625 use arrow_buffer::{Buffer, OffsetBuffer};
1626 use arrow_cast::display::{ArrayFormatter, FormatOptions};
1627 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1628
1629 use super::*;
1630
1631 #[test]
1632 fn test_fixed_width() {
1633 let cols = [
1634 Arc::new(Int16Array::from_iter([
1635 Some(1),
1636 Some(2),
1637 None,
1638 Some(-5),
1639 Some(2),
1640 Some(2),
1641 Some(0),
1642 ])) as ArrayRef,
1643 Arc::new(Float32Array::from_iter([
1644 Some(1.3),
1645 Some(2.5),
1646 None,
1647 Some(4.),
1648 Some(0.1),
1649 Some(-4.),
1650 Some(-0.),
1651 ])) as ArrayRef,
1652 ];
1653
1654 let converter = RowConverter::new(vec![
1655 SortField::new(DataType::Int16),
1656 SortField::new(DataType::Float32),
1657 ])
1658 .unwrap();
1659 let rows = converter.convert_columns(&cols).unwrap();
1660
1661 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1662 assert_eq!(
1663 rows.buffer,
1664 &[
1665 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
1680 );
1681
1682 assert!(rows.row(3) < rows.row(6));
1683 assert!(rows.row(0) < rows.row(1));
1684 assert!(rows.row(3) < rows.row(0));
1685 assert!(rows.row(4) < rows.row(1));
1686 assert!(rows.row(5) < rows.row(4));
1687
1688 let back = converter.convert_rows(&rows).unwrap();
1689 for (expected, actual) in cols.iter().zip(&back) {
1690 assert_eq!(expected, actual);
1691 }
1692 }
1693
1694 #[test]
1695 fn test_decimal128() {
1696 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1697 DECIMAL128_MAX_PRECISION,
1698 7,
1699 ))])
1700 .unwrap();
1701 let col = Arc::new(
1702 Decimal128Array::from_iter([
1703 None,
1704 Some(i128::MIN),
1705 Some(-13),
1706 Some(46_i128),
1707 Some(5456_i128),
1708 Some(i128::MAX),
1709 ])
1710 .with_precision_and_scale(38, 7)
1711 .unwrap(),
1712 ) as ArrayRef;
1713
1714 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1715 for i in 0..rows.num_rows() - 1 {
1716 assert!(rows.row(i) < rows.row(i + 1));
1717 }
1718
1719 let back = converter.convert_rows(&rows).unwrap();
1720 assert_eq!(back.len(), 1);
1721 assert_eq!(col.as_ref(), back[0].as_ref())
1722 }
1723
1724 #[test]
1725 fn test_decimal256() {
1726 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1727 DECIMAL256_MAX_PRECISION,
1728 7,
1729 ))])
1730 .unwrap();
1731 let col = Arc::new(
1732 Decimal256Array::from_iter([
1733 None,
1734 Some(i256::MIN),
1735 Some(i256::from_parts(0, -1)),
1736 Some(i256::from_parts(u128::MAX, -1)),
1737 Some(i256::from_parts(u128::MAX, 0)),
1738 Some(i256::from_parts(0, 46_i128)),
1739 Some(i256::from_parts(5, 46_i128)),
1740 Some(i256::MAX),
1741 ])
1742 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1743 .unwrap(),
1744 ) as ArrayRef;
1745
1746 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1747 for i in 0..rows.num_rows() - 1 {
1748 assert!(rows.row(i) < rows.row(i + 1));
1749 }
1750
1751 let back = converter.convert_rows(&rows).unwrap();
1752 assert_eq!(back.len(), 1);
1753 assert_eq!(col.as_ref(), back[0].as_ref())
1754 }
1755
1756 #[test]
1757 fn test_bool() {
1758 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1759
1760 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1761
1762 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1763 assert!(rows.row(2) > rows.row(1));
1764 assert!(rows.row(2) > rows.row(0));
1765 assert!(rows.row(1) > rows.row(0));
1766
1767 let cols = converter.convert_rows(&rows).unwrap();
1768 assert_eq!(&cols[0], &col);
1769
1770 let converter = RowConverter::new(vec![SortField::new_with_options(
1771 DataType::Boolean,
1772 SortOptions::default().desc().with_nulls_first(false),
1773 )])
1774 .unwrap();
1775
1776 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1777 assert!(rows.row(2) < rows.row(1));
1778 assert!(rows.row(2) < rows.row(0));
1779 assert!(rows.row(1) < rows.row(0));
1780 let cols = converter.convert_rows(&rows).unwrap();
1781 assert_eq!(&cols[0], &col);
1782 }
1783
1784 #[test]
1785 fn test_timezone() {
1786 let a =
1787 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1788 let d = a.data_type().clone();
1789
1790 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1791 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1792 let back = converter.convert_rows(&rows).unwrap();
1793 assert_eq!(back.len(), 1);
1794 assert_eq!(back[0].data_type(), &d);
1795
1796 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1798 a.append(34).unwrap();
1799 a.append_null();
1800 a.append(345).unwrap();
1801
1802 let dict = a.finish();
1804 let values = TimestampNanosecondArray::from(dict.values().to_data());
1805 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1806 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1807 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1808
1809 assert_eq!(dict_with_tz.data_type(), &d);
1810 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1811 let rows = converter
1812 .convert_columns(&[Arc::new(dict_with_tz) as _])
1813 .unwrap();
1814 let back = converter.convert_rows(&rows).unwrap();
1815 assert_eq!(back.len(), 1);
1816 assert_eq!(back[0].data_type(), &v);
1817 }
1818
1819 #[test]
1820 fn test_null_encoding() {
1821 let col = Arc::new(NullArray::new(10));
1822 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1823 let rows = converter.convert_columns(&[col]).unwrap();
1824 assert_eq!(rows.num_rows(), 10);
1825 assert_eq!(rows.row(1).data.len(), 0);
1826 }
1827
1828 #[test]
1829 fn test_variable_width() {
1830 let col = Arc::new(StringArray::from_iter([
1831 Some("hello"),
1832 Some("he"),
1833 None,
1834 Some("foo"),
1835 Some(""),
1836 ])) as ArrayRef;
1837
1838 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1839 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1840
1841 assert!(rows.row(1) < rows.row(0));
1842 assert!(rows.row(2) < rows.row(4));
1843 assert!(rows.row(3) < rows.row(0));
1844 assert!(rows.row(3) < rows.row(1));
1845
1846 let cols = converter.convert_rows(&rows).unwrap();
1847 assert_eq!(&cols[0], &col);
1848
1849 let col = Arc::new(BinaryArray::from_iter([
1850 None,
1851 Some(vec![0_u8; 0]),
1852 Some(vec![0_u8; 6]),
1853 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1854 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1855 Some(vec![0_u8; variable::BLOCK_SIZE]),
1856 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1857 Some(vec![1_u8; 6]),
1858 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1859 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1860 Some(vec![1_u8; variable::BLOCK_SIZE]),
1861 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1862 Some(vec![0xFF_u8; 6]),
1863 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1864 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1865 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1866 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1867 ])) as ArrayRef;
1868
1869 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1870 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1871
1872 for i in 0..rows.num_rows() {
1873 for j in i + 1..rows.num_rows() {
1874 assert!(
1875 rows.row(i) < rows.row(j),
1876 "{} < {} - {:?} < {:?}",
1877 i,
1878 j,
1879 rows.row(i),
1880 rows.row(j)
1881 );
1882 }
1883 }
1884
1885 let cols = converter.convert_rows(&rows).unwrap();
1886 assert_eq!(&cols[0], &col);
1887
1888 let converter = RowConverter::new(vec![SortField::new_with_options(
1889 DataType::Binary,
1890 SortOptions::default().desc().with_nulls_first(false),
1891 )])
1892 .unwrap();
1893 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1894
1895 for i in 0..rows.num_rows() {
1896 for j in i + 1..rows.num_rows() {
1897 assert!(
1898 rows.row(i) > rows.row(j),
1899 "{} > {} - {:?} > {:?}",
1900 i,
1901 j,
1902 rows.row(i),
1903 rows.row(j)
1904 );
1905 }
1906 }
1907
1908 let cols = converter.convert_rows(&rows).unwrap();
1909 assert_eq!(&cols[0], &col);
1910 }
1911
1912 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
1914 match b.data_type() {
1915 DataType::Dictionary(_, v) => {
1916 assert_eq!(a.data_type(), v.as_ref());
1917 let b = arrow_cast::cast(b, v).unwrap();
1918 assert_eq!(a, b.as_ref())
1919 }
1920 _ => assert_eq!(a, b),
1921 }
1922 }
1923
1924 #[test]
1925 fn test_string_dictionary() {
1926 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1927 Some("foo"),
1928 Some("hello"),
1929 Some("he"),
1930 None,
1931 Some("hello"),
1932 Some(""),
1933 Some("hello"),
1934 Some("hello"),
1935 ])) as ArrayRef;
1936
1937 let field = SortField::new(a.data_type().clone());
1938 let converter = RowConverter::new(vec![field]).unwrap();
1939 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1940
1941 assert!(rows_a.row(3) < rows_a.row(5));
1942 assert!(rows_a.row(2) < rows_a.row(1));
1943 assert!(rows_a.row(0) < rows_a.row(1));
1944 assert!(rows_a.row(3) < rows_a.row(0));
1945
1946 assert_eq!(rows_a.row(1), rows_a.row(4));
1947 assert_eq!(rows_a.row(1), rows_a.row(6));
1948 assert_eq!(rows_a.row(1), rows_a.row(7));
1949
1950 let cols = converter.convert_rows(&rows_a).unwrap();
1951 dictionary_eq(&cols[0], &a);
1952
1953 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1954 Some("hello"),
1955 None,
1956 Some("cupcakes"),
1957 ])) as ArrayRef;
1958
1959 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
1960 assert_eq!(rows_a.row(1), rows_b.row(0));
1961 assert_eq!(rows_a.row(3), rows_b.row(1));
1962 assert!(rows_b.row(2) < rows_a.row(0));
1963
1964 let cols = converter.convert_rows(&rows_b).unwrap();
1965 dictionary_eq(&cols[0], &b);
1966
1967 let converter = RowConverter::new(vec![SortField::new_with_options(
1968 a.data_type().clone(),
1969 SortOptions::default().desc().with_nulls_first(false),
1970 )])
1971 .unwrap();
1972
1973 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1974 assert!(rows_c.row(3) > rows_c.row(5));
1975 assert!(rows_c.row(2) > rows_c.row(1));
1976 assert!(rows_c.row(0) > rows_c.row(1));
1977 assert!(rows_c.row(3) > rows_c.row(0));
1978
1979 let cols = converter.convert_rows(&rows_c).unwrap();
1980 dictionary_eq(&cols[0], &a);
1981
1982 let converter = RowConverter::new(vec![SortField::new_with_options(
1983 a.data_type().clone(),
1984 SortOptions::default().desc().with_nulls_first(true),
1985 )])
1986 .unwrap();
1987
1988 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1989 assert!(rows_c.row(3) < rows_c.row(5));
1990 assert!(rows_c.row(2) > rows_c.row(1));
1991 assert!(rows_c.row(0) > rows_c.row(1));
1992 assert!(rows_c.row(3) < rows_c.row(0));
1993
1994 let cols = converter.convert_rows(&rows_c).unwrap();
1995 dictionary_eq(&cols[0], &a);
1996 }
1997
1998 #[test]
1999 fn test_struct() {
2000 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2002 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2003 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2004 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2005 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2006
2007 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2008 let converter = RowConverter::new(sort_fields).unwrap();
2009 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2010
2011 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2012 assert!(a < b);
2013 }
2014
2015 let back = converter.convert_rows(&r1).unwrap();
2016 assert_eq!(back.len(), 1);
2017 assert_eq!(&back[0], &s1);
2018
2019 let data = s1
2021 .to_data()
2022 .into_builder()
2023 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2024 .null_count(2)
2025 .build()
2026 .unwrap();
2027
2028 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2029 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2030 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2036 assert_eq!(back.len(), 1);
2037 assert_eq!(&back[0], &s2);
2038
2039 back[0].to_data().validate_full().unwrap();
2040 }
2041
2042 #[test]
2043 fn test_primitive_dictionary() {
2044 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2045 builder.append(2).unwrap();
2046 builder.append(3).unwrap();
2047 builder.append(0).unwrap();
2048 builder.append_null();
2049 builder.append(5).unwrap();
2050 builder.append(3).unwrap();
2051 builder.append(-1).unwrap();
2052
2053 let a = builder.finish();
2054 let data_type = a.data_type().clone();
2055 let columns = [Arc::new(a) as ArrayRef];
2056
2057 let field = SortField::new(data_type.clone());
2058 let converter = RowConverter::new(vec![field]).unwrap();
2059 let rows = converter.convert_columns(&columns).unwrap();
2060 assert!(rows.row(0) < rows.row(1));
2061 assert!(rows.row(2) < rows.row(0));
2062 assert!(rows.row(3) < rows.row(2));
2063 assert!(rows.row(6) < rows.row(2));
2064 assert!(rows.row(3) < rows.row(6));
2065 }
2066
2067 #[test]
2068 fn test_dictionary_nulls() {
2069 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2070 let keys =
2071 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2072
2073 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2074 let data = keys
2075 .into_builder()
2076 .data_type(data_type.clone())
2077 .child_data(vec![values])
2078 .build()
2079 .unwrap();
2080
2081 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2082 let field = SortField::new(data_type.clone());
2083 let converter = RowConverter::new(vec![field]).unwrap();
2084 let rows = converter.convert_columns(&columns).unwrap();
2085
2086 assert_eq!(rows.row(0), rows.row(1));
2087 assert_eq!(rows.row(3), rows.row(4));
2088 assert_eq!(rows.row(4), rows.row(5));
2089 assert!(rows.row(3) < rows.row(0));
2090 }
2091
2092 #[test]
2093 #[should_panic(expected = "Encountered non UTF-8 data")]
2094 fn test_invalid_utf8() {
2095 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2096 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2097 let rows = converter.convert_columns(&[array]).unwrap();
2098 let binary_row = rows.row(0);
2099
2100 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2101 let parser = converter.parser();
2102 let utf8_row = parser.parse(binary_row.as_ref());
2103
2104 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2105 }
2106
2107 #[test]
2108 #[should_panic(expected = "Encountered non UTF-8 data")]
2109 fn test_invalid_utf8_array() {
2110 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2111 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2112 let rows = converter.convert_columns(&[array]).unwrap();
2113 let binary_rows = rows.try_into_binary().expect("known-small rows");
2114
2115 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2116 let parsed = converter.from_binary(binary_rows);
2117
2118 converter.convert_rows(parsed.iter()).unwrap();
2119 }
2120
2121 #[test]
2122 #[should_panic(expected = "index out of bounds")]
2123 fn test_invalid_empty() {
2124 let binary_row: &[u8] = &[];
2125
2126 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2127 let parser = converter.parser();
2128 let utf8_row = parser.parse(binary_row.as_ref());
2129
2130 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2131 }
2132
2133 #[test]
2134 #[should_panic(expected = "index out of bounds")]
2135 fn test_invalid_empty_array() {
2136 let row: &[u8] = &[];
2137 let binary_rows = BinaryArray::from(vec![row]);
2138
2139 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2140 let parsed = converter.from_binary(binary_rows);
2141
2142 converter.convert_rows(parsed.iter()).unwrap();
2143 }
2144
2145 #[test]
2146 #[should_panic(expected = "index out of bounds")]
2147 fn test_invalid_truncated() {
2148 let binary_row: &[u8] = &[0x02];
2149
2150 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2151 let parser = converter.parser();
2152 let utf8_row = parser.parse(binary_row.as_ref());
2153
2154 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2155 }
2156
2157 #[test]
2158 #[should_panic(expected = "index out of bounds")]
2159 fn test_invalid_truncated_array() {
2160 let row: &[u8] = &[0x02];
2161 let binary_rows = BinaryArray::from(vec![row]);
2162
2163 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2164 let parsed = converter.from_binary(binary_rows);
2165
2166 converter.convert_rows(parsed.iter()).unwrap();
2167 }
2168
2169 #[test]
2170 #[should_panic(expected = "rows were not produced by this RowConverter")]
2171 fn test_different_converter() {
2172 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2173 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2174 let rows = converter.convert_columns(&[values]).unwrap();
2175
2176 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2177 let _ = converter.convert_rows(&rows);
2178 }
2179
2180 fn test_single_list<O: OffsetSizeTrait>() {
2181 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2182 builder.values().append_value(32);
2183 builder.values().append_value(52);
2184 builder.values().append_value(32);
2185 builder.append(true);
2186 builder.values().append_value(32);
2187 builder.values().append_value(52);
2188 builder.values().append_value(12);
2189 builder.append(true);
2190 builder.values().append_value(32);
2191 builder.values().append_value(52);
2192 builder.append(true);
2193 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2196 builder.values().append_value(32);
2197 builder.values().append_null();
2198 builder.append(true);
2199 builder.append(true);
2200
2201 let list = Arc::new(builder.finish()) as ArrayRef;
2202 let d = list.data_type().clone();
2203
2204 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2205
2206 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2207 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2215 assert_eq!(back.len(), 1);
2216 back[0].to_data().validate_full().unwrap();
2217 assert_eq!(&back[0], &list);
2218
2219 let options = SortOptions::default().asc().with_nulls_first(false);
2220 let field = SortField::new_with_options(d.clone(), options);
2221 let converter = RowConverter::new(vec![field]).unwrap();
2222 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2223
2224 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2232 assert_eq!(back.len(), 1);
2233 back[0].to_data().validate_full().unwrap();
2234 assert_eq!(&back[0], &list);
2235
2236 let options = SortOptions::default().desc().with_nulls_first(false);
2237 let field = SortField::new_with_options(d.clone(), options);
2238 let converter = RowConverter::new(vec![field]).unwrap();
2239 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2240
2241 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2249 assert_eq!(back.len(), 1);
2250 back[0].to_data().validate_full().unwrap();
2251 assert_eq!(&back[0], &list);
2252
2253 let options = SortOptions::default().desc().with_nulls_first(true);
2254 let field = SortField::new_with_options(d, options);
2255 let converter = RowConverter::new(vec![field]).unwrap();
2256 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2257
2258 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2266 assert_eq!(back.len(), 1);
2267 back[0].to_data().validate_full().unwrap();
2268 assert_eq!(&back[0], &list);
2269 }
2270
2271 fn test_nested_list<O: OffsetSizeTrait>() {
2272 let mut builder =
2273 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2274
2275 builder.values().values().append_value(1);
2276 builder.values().values().append_value(2);
2277 builder.values().append(true);
2278 builder.values().values().append_value(1);
2279 builder.values().values().append_null();
2280 builder.values().append(true);
2281 builder.append(true);
2282
2283 builder.values().values().append_value(1);
2284 builder.values().values().append_null();
2285 builder.values().append(true);
2286 builder.values().values().append_value(1);
2287 builder.values().values().append_null();
2288 builder.values().append(true);
2289 builder.append(true);
2290
2291 builder.values().values().append_value(1);
2292 builder.values().values().append_null();
2293 builder.values().append(true);
2294 builder.values().append(false);
2295 builder.append(true);
2296 builder.append(false);
2297
2298 builder.values().values().append_value(1);
2299 builder.values().values().append_value(2);
2300 builder.values().append(true);
2301 builder.append(true);
2302
2303 let list = Arc::new(builder.finish()) as ArrayRef;
2304 let d = list.data_type().clone();
2305
2306 let options = SortOptions::default().asc().with_nulls_first(true);
2314 let field = SortField::new_with_options(d.clone(), options);
2315 let converter = RowConverter::new(vec![field]).unwrap();
2316 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2317
2318 assert!(rows.row(0) > rows.row(1));
2319 assert!(rows.row(1) > rows.row(2));
2320 assert!(rows.row(2) > rows.row(3));
2321 assert!(rows.row(4) < rows.row(0));
2322 assert!(rows.row(4) > rows.row(1));
2323
2324 let back = converter.convert_rows(&rows).unwrap();
2325 assert_eq!(back.len(), 1);
2326 back[0].to_data().validate_full().unwrap();
2327 assert_eq!(&back[0], &list);
2328
2329 let options = SortOptions::default().desc().with_nulls_first(true);
2330 let field = SortField::new_with_options(d.clone(), options);
2331 let converter = RowConverter::new(vec![field]).unwrap();
2332 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2333
2334 assert!(rows.row(0) > rows.row(1));
2335 assert!(rows.row(1) > rows.row(2));
2336 assert!(rows.row(2) > rows.row(3));
2337 assert!(rows.row(4) > rows.row(0));
2338 assert!(rows.row(4) > rows.row(1));
2339
2340 let back = converter.convert_rows(&rows).unwrap();
2341 assert_eq!(back.len(), 1);
2342 back[0].to_data().validate_full().unwrap();
2343 assert_eq!(&back[0], &list);
2344
2345 let options = SortOptions::default().desc().with_nulls_first(false);
2346 let field = SortField::new_with_options(d, options);
2347 let converter = RowConverter::new(vec![field]).unwrap();
2348 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2349
2350 assert!(rows.row(0) < rows.row(1));
2351 assert!(rows.row(1) < rows.row(2));
2352 assert!(rows.row(2) < rows.row(3));
2353 assert!(rows.row(4) > rows.row(0));
2354 assert!(rows.row(4) < rows.row(1));
2355
2356 let back = converter.convert_rows(&rows).unwrap();
2357 assert_eq!(back.len(), 1);
2358 back[0].to_data().validate_full().unwrap();
2359 assert_eq!(&back[0], &list);
2360 }
2361
2362 #[test]
2363 fn test_list() {
2364 test_single_list::<i32>();
2365 test_nested_list::<i32>();
2366 }
2367
2368 #[test]
2369 fn test_large_list() {
2370 test_single_list::<i64>();
2371 test_nested_list::<i64>();
2372 }
2373
2374 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2375 where
2376 K: ArrowPrimitiveType,
2377 StandardUniform: Distribution<K::Native>,
2378 {
2379 let mut rng = rng();
2380 (0..len)
2381 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
2382 .collect()
2383 }
2384
2385 fn generate_strings<O: OffsetSizeTrait>(
2386 len: usize,
2387 valid_percent: f64,
2388 ) -> GenericStringArray<O> {
2389 let mut rng = rng();
2390 (0..len)
2391 .map(|_| {
2392 rng.random_bool(valid_percent).then(|| {
2393 let len = rng.random_range(0..100);
2394 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2395 String::from_utf8(bytes).unwrap()
2396 })
2397 })
2398 .collect()
2399 }
2400
2401 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2402 let mut rng = rng();
2403 (0..len)
2404 .map(|_| {
2405 rng.random_bool(valid_percent).then(|| {
2406 let len = rng.random_range(0..100);
2407 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2408 String::from_utf8(bytes).unwrap()
2409 })
2410 })
2411 .collect()
2412 }
2413
2414 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2415 let mut rng = rng();
2416 (0..len)
2417 .map(|_| {
2418 rng.random_bool(valid_percent).then(|| {
2419 let len = rng.random_range(0..100);
2420 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
2421 bytes
2422 })
2423 })
2424 .collect()
2425 }
2426
2427 fn generate_dictionary<K>(
2428 values: ArrayRef,
2429 len: usize,
2430 valid_percent: f64,
2431 ) -> DictionaryArray<K>
2432 where
2433 K: ArrowDictionaryKeyType,
2434 K::Native: SampleUniform,
2435 {
2436 let mut rng = rng();
2437 let min_key = K::Native::from_usize(0).unwrap();
2438 let max_key = K::Native::from_usize(values.len()).unwrap();
2439 let keys: PrimitiveArray<K> = (0..len)
2440 .map(|_| {
2441 rng.random_bool(valid_percent)
2442 .then(|| rng.random_range(min_key..max_key))
2443 })
2444 .collect();
2445
2446 let data_type =
2447 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2448
2449 let data = keys
2450 .into_data()
2451 .into_builder()
2452 .data_type(data_type)
2453 .add_child_data(values.to_data())
2454 .build()
2455 .unwrap();
2456
2457 DictionaryArray::from(data)
2458 }
2459
2460 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2461 let mut rng = rng();
2462 let width = rng.random_range(0..20);
2463 let mut builder = FixedSizeBinaryBuilder::new(width);
2464
2465 let mut b = vec![0; width as usize];
2466 for _ in 0..len {
2467 match rng.random_bool(valid_percent) {
2468 true => {
2469 b.iter_mut().for_each(|x| *x = rng.random());
2470 builder.append_value(&b).unwrap();
2471 }
2472 false => builder.append_null(),
2473 }
2474 }
2475
2476 builder.finish()
2477 }
2478
2479 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2480 let mut rng = rng();
2481 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2482 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2483 let b = generate_strings::<i32>(len, valid_percent);
2484 let fields = Fields::from(vec![
2485 Field::new("a", DataType::Int32, true),
2486 Field::new("b", DataType::Utf8, true),
2487 ]);
2488 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2489 StructArray::new(fields, values, Some(nulls))
2490 }
2491
2492 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2493 where
2494 F: FnOnce(usize) -> ArrayRef,
2495 {
2496 let mut rng = rng();
2497 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
2498 let values_len = offsets.last().unwrap().to_usize().unwrap();
2499 let values = values(values_len);
2500 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2501 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2502 ListArray::new(field, offsets, values, Some(nulls))
2503 }
2504
2505 fn generate_column(len: usize) -> ArrayRef {
2506 let mut rng = rng();
2507 match rng.random_range(0..16) {
2508 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2509 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2510 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2511 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2512 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2513 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2514 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2515 7 => Arc::new(generate_dictionary::<Int64Type>(
2516 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
2518 len,
2519 0.8,
2520 )),
2521 8 => Arc::new(generate_dictionary::<Int64Type>(
2522 Arc::new(generate_primitive_array::<Int64Type>(
2524 rng.random_range(1..len),
2525 1.0,
2526 )),
2527 len,
2528 0.8,
2529 )),
2530 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
2531 10 => Arc::new(generate_struct(len, 0.8)),
2532 11 => Arc::new(generate_list(len, 0.8, |values_len| {
2533 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
2534 })),
2535 12 => Arc::new(generate_list(len, 0.8, |values_len| {
2536 Arc::new(generate_strings::<i32>(values_len, 0.8))
2537 })),
2538 13 => Arc::new(generate_list(len, 0.8, |values_len| {
2539 Arc::new(generate_struct(values_len, 0.8))
2540 })),
2541 14 => Arc::new(generate_string_view(len, 0.8)),
2542 15 => Arc::new(generate_byte_view(len, 0.8)),
2543 _ => unreachable!(),
2544 }
2545 }
2546
2547 fn print_row(cols: &[SortColumn], row: usize) -> String {
2548 let t: Vec<_> = cols
2549 .iter()
2550 .map(|x| match x.values.is_valid(row) {
2551 true => {
2552 let opts = FormatOptions::default().with_null("NULL");
2553 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
2554 formatter.value(row).to_string()
2555 }
2556 false => "NULL".to_string(),
2557 })
2558 .collect();
2559 t.join(",")
2560 }
2561
2562 fn print_col_types(cols: &[SortColumn]) -> String {
2563 let t: Vec<_> = cols
2564 .iter()
2565 .map(|x| x.values.data_type().to_string())
2566 .collect();
2567 t.join(",")
2568 }
2569
2570 #[test]
2571 #[cfg_attr(miri, ignore)]
2572 fn fuzz_test() {
2573 for _ in 0..100 {
2574 let mut rng = rng();
2575 let num_columns = rng.random_range(1..5);
2576 let len = rng.random_range(5..100);
2577 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
2578
2579 let options: Vec<_> = (0..num_columns)
2580 .map(|_| SortOptions {
2581 descending: rng.random_bool(0.5),
2582 nulls_first: rng.random_bool(0.5),
2583 })
2584 .collect();
2585
2586 let sort_columns: Vec<_> = options
2587 .iter()
2588 .zip(&arrays)
2589 .map(|(o, c)| SortColumn {
2590 values: Arc::clone(c),
2591 options: Some(*o),
2592 })
2593 .collect();
2594
2595 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
2596
2597 let columns: Vec<SortField> = options
2598 .into_iter()
2599 .zip(&arrays)
2600 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
2601 .collect();
2602
2603 let converter = RowConverter::new(columns).unwrap();
2604 let rows = converter.convert_columns(&arrays).unwrap();
2605
2606 for i in 0..len {
2607 for j in 0..len {
2608 let row_i = rows.row(i);
2609 let row_j = rows.row(j);
2610 let row_cmp = row_i.cmp(&row_j);
2611 let lex_cmp = comparator.compare(i, j);
2612 assert_eq!(
2613 row_cmp,
2614 lex_cmp,
2615 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
2616 print_row(&sort_columns, i),
2617 print_row(&sort_columns, j),
2618 row_i,
2619 row_j,
2620 print_col_types(&sort_columns)
2621 );
2622 }
2623 }
2624
2625 let back = converter.convert_rows(&rows).unwrap();
2626 for (actual, expected) in back.iter().zip(&arrays) {
2627 actual.to_data().validate_full().unwrap();
2628 dictionary_eq(actual, expected)
2629 }
2630
2631 let rows = rows.try_into_binary().expect("reasonable size");
2633 let parser = converter.parser();
2634 let back = converter
2635 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
2636 .unwrap();
2637 for (actual, expected) in back.iter().zip(&arrays) {
2638 actual.to_data().validate_full().unwrap();
2639 dictionary_eq(actual, expected)
2640 }
2641
2642 let rows = converter.from_binary(rows);
2643 let back = converter.convert_rows(&rows).unwrap();
2644 for (actual, expected) in back.iter().zip(&arrays) {
2645 actual.to_data().validate_full().unwrap();
2646 dictionary_eq(actual, expected)
2647 }
2648 }
2649 }
2650
2651 #[test]
2652 fn test_clear() {
2653 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2654 let mut rows = converter.empty_rows(3, 128);
2655
2656 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
2657 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
2658 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
2659
2660 for array in arrays.iter() {
2661 rows.clear();
2662 converter.append(&mut rows, &[array.clone()]).unwrap();
2663 let back = converter.convert_rows(&rows).unwrap();
2664 assert_eq!(&back[0], array);
2665 }
2666
2667 let mut rows_expected = converter.empty_rows(3, 128);
2668 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
2669
2670 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
2671 assert_eq!(
2672 actual, expected,
2673 "For row {}: expected {:?}, actual: {:?}",
2674 i, expected, actual
2675 );
2676 }
2677 }
2678
2679 #[test]
2680 fn test_append_codec_dictionary_binary() {
2681 use DataType::*;
2682 let converter = RowConverter::new(vec![SortField::new(Dictionary(
2684 Box::new(Int32),
2685 Box::new(Binary),
2686 ))])
2687 .unwrap();
2688 let mut rows = converter.empty_rows(4, 128);
2689
2690 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
2691 let values = BinaryArray::from(vec![
2692 Some("a".as_bytes()),
2693 Some(b"b"),
2694 Some(b"c"),
2695 Some(b"d"),
2696 ]);
2697 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2698
2699 rows.clear();
2700 let array = Arc::new(dict_array) as ArrayRef;
2701 converter.append(&mut rows, &[array.clone()]).unwrap();
2702 let back = converter.convert_rows(&rows).unwrap();
2703
2704 dictionary_eq(&back[0], &array);
2705 }
2706
2707 #[test]
2708 fn test_list_prefix() {
2709 let mut a = ListBuilder::new(Int8Builder::new());
2710 a.append_value([None]);
2711 a.append_value([None, None]);
2712 let a = a.finish();
2713
2714 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2715 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2716 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2717 }
2718}