1use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
31use crate::expressions::PhysicalSortExpr;
32use crate::filter_pushdown::{
33 ChildFilterDescription, FilterDescription, FilterPushdownPhase,
34};
35use crate::limit::LimitStream;
36use crate::metrics::{
37 BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
38};
39use crate::projection::{ProjectionExec, make_with_child, update_ordering};
40use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
41use crate::spill::get_record_batch_memory_size;
42use crate::spill::in_progress_spill_file::InProgressSpillFile;
43use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
44use crate::stream::RecordBatchStreamAdapter;
45use crate::stream::ReservationStream;
46use crate::topk::TopK;
47use crate::topk::TopKDynamicFilters;
48use crate::{
49 DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
50 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
51 Statistics,
52};
53
54use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
55use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
56use arrow::datatypes::SchemaRef;
57use datafusion_common::config::SpillCompression;
58use datafusion_common::{
59 DataFusionError, Result, assert_or_internal_err, internal_datafusion_err,
60 unwrap_or_internal_err,
61};
62use datafusion_execution::TaskContext;
63use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
64use datafusion_execution::runtime_env::RuntimeEnv;
65use datafusion_physical_expr::LexOrdering;
66use datafusion_physical_expr::PhysicalExpr;
67use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
68
69use futures::{StreamExt, TryStreamExt};
70use log::{debug, trace};
71
72struct ExternalSorterMetrics {
73 baseline: BaselineMetrics,
75
76 spill_metrics: SpillMetrics,
77}
78
79impl ExternalSorterMetrics {
80 fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
81 Self {
82 baseline: BaselineMetrics::new(metrics, partition),
83 spill_metrics: SpillMetrics::new(metrics, partition),
84 }
85 }
86}
87
88struct ExternalSorter {
206 schema: SchemaRef,
212 expr: LexOrdering,
214 batch_size: usize,
216 sort_in_place_threshold_bytes: usize,
220
221 in_mem_batches: Vec<RecordBatch>,
227
228 in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
235 finished_spill_files: Vec<SortedSpillFile>,
240
241 metrics: ExternalSorterMetrics,
247 runtime: Arc<RuntimeEnv>,
249 reservation: MemoryReservation,
251 spill_manager: SpillManager,
252
253 merge_reservation: MemoryReservation,
257 sort_spill_reservation_bytes: usize,
260}
261
262impl ExternalSorter {
263 #[expect(clippy::too_many_arguments)]
266 pub fn new(
267 partition_id: usize,
268 schema: SchemaRef,
269 expr: LexOrdering,
270 batch_size: usize,
271 sort_spill_reservation_bytes: usize,
272 sort_in_place_threshold_bytes: usize,
273 spill_compression: SpillCompression,
275 metrics: &ExecutionPlanMetricsSet,
276 runtime: Arc<RuntimeEnv>,
277 ) -> Result<Self> {
278 let metrics = ExternalSorterMetrics::new(metrics, partition_id);
279 let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
280 .with_can_spill(true)
281 .register(&runtime.memory_pool);
282
283 let merge_reservation =
284 MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
285 .register(&runtime.memory_pool);
286
287 let spill_manager = SpillManager::new(
288 Arc::clone(&runtime),
289 metrics.spill_metrics.clone(),
290 Arc::clone(&schema),
291 )
292 .with_compression_type(spill_compression);
293
294 Ok(Self {
295 schema,
296 in_mem_batches: vec![],
297 in_progress_spill_file: None,
298 finished_spill_files: vec![],
299 expr,
300 metrics,
301 reservation,
302 spill_manager,
303 merge_reservation,
304 runtime,
305 batch_size,
306 sort_spill_reservation_bytes,
307 sort_in_place_threshold_bytes,
308 })
309 }
310
311 async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
315 if input.num_rows() == 0 {
316 return Ok(());
317 }
318
319 self.reserve_memory_for_merge()?;
320 self.reserve_memory_for_batch_and_maybe_spill(&input)
321 .await?;
322
323 self.in_mem_batches.push(input);
324 Ok(())
325 }
326
327 fn spilled_before(&self) -> bool {
328 !self.finished_spill_files.is_empty()
329 }
330
331 async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
341 self.merge_reservation.free();
345
346 if self.spilled_before() {
347 if !self.in_mem_batches.is_empty() {
351 self.sort_and_spill_in_mem_batches().await?;
352 }
353
354 StreamingMergeBuilder::new()
355 .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
356 .with_spill_manager(self.spill_manager.clone())
357 .with_schema(Arc::clone(&self.schema))
358 .with_expressions(&self.expr.clone())
359 .with_metrics(self.metrics.baseline.clone())
360 .with_batch_size(self.batch_size)
361 .with_fetch(None)
362 .with_reservation(self.merge_reservation.new_empty())
363 .build()
364 } else {
365 self.in_mem_sort_stream(self.metrics.baseline.clone())
366 }
367 }
368
369 fn used(&self) -> usize {
371 self.reservation.size()
372 }
373
374 fn spilled_bytes(&self) -> usize {
376 self.metrics.spill_metrics.spilled_bytes.value()
377 }
378
379 fn spilled_rows(&self) -> usize {
381 self.metrics.spill_metrics.spilled_rows.value()
382 }
383
384 fn spill_count(&self) -> usize {
386 self.metrics.spill_metrics.spill_file_count.value()
387 }
388
389 async fn consume_and_spill_append(
392 &mut self,
393 globally_sorted_batches: &mut Vec<RecordBatch>,
394 ) -> Result<()> {
395 if globally_sorted_batches.is_empty() {
396 return Ok(());
397 }
398
399 if self.in_progress_spill_file.is_none() {
401 self.in_progress_spill_file =
402 Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
403 }
404
405 Self::organize_stringview_arrays(globally_sorted_batches)?;
406
407 debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
408
409 let batches_to_spill = std::mem::take(globally_sorted_batches);
410 self.reservation.free();
411
412 let (in_progress_file, max_record_batch_size) =
413 self.in_progress_spill_file.as_mut().ok_or_else(|| {
414 internal_datafusion_err!("In-progress spill file should be initialized")
415 })?;
416
417 for batch in batches_to_spill {
418 in_progress_file.append_batch(&batch)?;
419
420 *max_record_batch_size =
421 (*max_record_batch_size).max(batch.get_sliced_size()?);
422 }
423
424 assert_or_internal_err!(
425 globally_sorted_batches.is_empty(),
426 "This function consumes globally_sorted_batches, so it should be empty after taking."
427 );
428
429 Ok(())
430 }
431
432 async fn spill_finish(&mut self) -> Result<()> {
434 let (mut in_progress_file, max_record_batch_memory) =
435 self.in_progress_spill_file.take().ok_or_else(|| {
436 internal_datafusion_err!("Should be called after `spill_append`")
437 })?;
438 let spill_file = in_progress_file.finish()?;
439
440 if let Some(spill_file) = spill_file {
441 self.finished_spill_files.push(SortedSpillFile {
442 file: spill_file,
443 max_record_batch_memory,
444 });
445 }
446
447 Ok(())
448 }
449
450 fn organize_stringview_arrays(
480 globally_sorted_batches: &mut Vec<RecordBatch>,
481 ) -> Result<()> {
482 let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
483
484 for batch in globally_sorted_batches.drain(..) {
485 let mut new_columns: Vec<Arc<dyn Array>> =
486 Vec::with_capacity(batch.num_columns());
487
488 let mut arr_mutated = false;
489 for array in batch.columns() {
490 if let Some(string_view_array) =
491 array.as_any().downcast_ref::<StringViewArray>()
492 {
493 let new_array = string_view_array.gc();
494 new_columns.push(Arc::new(new_array));
495 arr_mutated = true;
496 } else {
497 new_columns.push(Arc::clone(array));
498 }
499 }
500
501 let organized_batch = if arr_mutated {
502 RecordBatch::try_new(batch.schema(), new_columns)?
503 } else {
504 batch
505 };
506
507 organized_batches.push(organized_batch);
508 }
509
510 *globally_sorted_batches = organized_batches;
511
512 Ok(())
513 }
514
515 async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
518 assert_or_internal_err!(
519 !self.in_mem_batches.is_empty(),
520 "in_mem_batches must not be empty when attempting to sort and spill"
521 );
522
523 self.merge_reservation.free();
528
529 let mut sorted_stream =
530 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
531 assert_or_internal_err!(
534 self.in_mem_batches.is_empty(),
535 "in_mem_batches should be empty after constructing sorted stream"
536 );
537 let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
541
542 while let Some(batch) = sorted_stream.next().await {
543 let batch = batch?;
544 let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
545 if self.reservation.try_grow(sorted_size).is_err() {
546 globally_sorted_batches.push(batch);
550 self.consume_and_spill_append(&mut globally_sorted_batches)
551 .await?; } else {
553 globally_sorted_batches.push(batch);
554 }
555 }
556
557 drop(sorted_stream);
560
561 self.consume_and_spill_append(&mut globally_sorted_batches)
562 .await?;
563 self.spill_finish().await?;
564
565 let buffers_cleared_property =
567 self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
568 assert_or_internal_err!(
569 buffers_cleared_property,
570 "in_mem_batches and globally_sorted_batches should be cleared before"
571 );
572
573 self.reserve_memory_for_merge()?;
575
576 Ok(())
577 }
578
579 fn in_mem_sort_stream(
638 &mut self,
639 metrics: BaselineMetrics,
640 ) -> Result<SendableRecordBatchStream> {
641 if self.in_mem_batches.is_empty() {
642 return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
643 &self.schema,
644 ))));
645 }
646
647 let elapsed_compute = metrics.elapsed_compute().clone();
650 let _timer = elapsed_compute.timer();
651
652 if self.in_mem_batches.len() == 1 {
659 let batch = self.in_mem_batches.swap_remove(0);
660 let reservation = self.reservation.take();
661 return self.sort_batch_stream(batch, &metrics, reservation);
662 }
663
664 if self.reservation.size() < self.sort_in_place_threshold_bytes {
666 let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
668 self.in_mem_batches.clear();
669 self.reservation
670 .try_resize(get_reserved_bytes_for_record_batch(&batch)?)
671 .map_err(Self::err_with_oom_context)?;
672 let reservation = self.reservation.take();
673 return self.sort_batch_stream(batch, &metrics, reservation);
674 }
675
676 let streams = std::mem::take(&mut self.in_mem_batches)
677 .into_iter()
678 .map(|batch| {
679 let metrics = self.metrics.baseline.intermediate();
680 let reservation = self
681 .reservation
682 .split(get_reserved_bytes_for_record_batch(&batch)?);
683 let input = self.sort_batch_stream(batch, &metrics, reservation)?;
684 Ok(spawn_buffered(input, 1))
685 })
686 .collect::<Result<_>>()?;
687
688 StreamingMergeBuilder::new()
689 .with_streams(streams)
690 .with_schema(Arc::clone(&self.schema))
691 .with_expressions(&self.expr.clone())
692 .with_metrics(metrics)
693 .with_batch_size(self.batch_size)
694 .with_fetch(None)
695 .with_reservation(self.merge_reservation.new_empty())
696 .build()
697 }
698
699 fn sort_batch_stream(
709 &self,
710 batch: RecordBatch,
711 metrics: &BaselineMetrics,
712 mut reservation: MemoryReservation,
713 ) -> Result<SendableRecordBatchStream> {
714 assert_eq!(
715 get_reserved_bytes_for_record_batch(&batch)?,
716 reservation.size()
717 );
718
719 let schema = batch.schema();
720 let expressions = self.expr.clone();
721 let batch_size = self.batch_size;
722 let output_row_metrics = metrics.output_rows().clone();
723
724 let stream = futures::stream::once(async move {
725 let schema = batch.schema();
726
727 let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
729 drop(batch);
730
731 reservation.free();
733
734 Result::<_, DataFusionError>::Ok((schema, sorted_batches, reservation))
735 })
736 .then({
737 move |batches| async move {
738 match batches {
739 Ok((schema, sorted_batches, mut reservation)) => {
740 let total_sorted_size: usize = sorted_batches
742 .iter()
743 .map(get_record_batch_memory_size)
744 .sum();
745 reservation
746 .try_grow(total_sorted_size)
747 .map_err(Self::err_with_oom_context)?;
748
749 Ok(Box::pin(ReservationStream::new(
751 Arc::clone(&schema),
752 Box::pin(RecordBatchStreamAdapter::new(
753 schema,
754 futures::stream::iter(sorted_batches.into_iter().map(Ok)),
755 )),
756 reservation,
757 )) as SendableRecordBatchStream)
758 }
759 Err(e) => Err(e),
760 }
761 }
762 })
763 .try_flatten()
764 .map(move |batch| match batch {
765 Ok(batch) => {
766 output_row_metrics.add(batch.num_rows());
767 Ok(batch)
768 }
769 Err(e) => Err(e),
770 });
771
772 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
773 }
774
775 fn reserve_memory_for_merge(&mut self) -> Result<()> {
779 if self.runtime.disk_manager.tmp_files_enabled() {
781 let size = self.sort_spill_reservation_bytes;
782 if self.merge_reservation.size() != size {
783 self.merge_reservation
784 .try_resize(size)
785 .map_err(Self::err_with_oom_context)?;
786 }
787 }
788
789 Ok(())
790 }
791
792 async fn reserve_memory_for_batch_and_maybe_spill(
795 &mut self,
796 input: &RecordBatch,
797 ) -> Result<()> {
798 let size = get_reserved_bytes_for_record_batch(input)?;
799
800 match self.reservation.try_grow(size) {
801 Ok(_) => Ok(()),
802 Err(e) => {
803 if self.in_mem_batches.is_empty() {
804 return Err(Self::err_with_oom_context(e));
805 }
806
807 self.sort_and_spill_in_mem_batches().await?;
809 self.reservation
810 .try_grow(size)
811 .map_err(Self::err_with_oom_context)
812 }
813 }
814 }
815
816 fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
819 match e {
820 DataFusionError::ResourcesExhausted(_) => e.context(
821 "Not enough memory to continue external sort. \
822 Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
823 ),
824 _ => e,
826 }
827 }
828}
829
830pub(crate) fn get_reserved_bytes_for_record_batch_size(
841 record_batch_size: usize,
842 sliced_size: usize,
843) -> usize {
844 record_batch_size + sliced_size
848}
849
850pub(super) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result<usize> {
854 Ok(get_reserved_bytes_for_record_batch_size(
855 get_record_batch_memory_size(batch),
856 batch.get_sliced_size()?,
857 ))
858}
859
860impl Debug for ExternalSorter {
861 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
862 f.debug_struct("ExternalSorter")
863 .field("memory_used", &self.used())
864 .field("spilled_bytes", &self.spilled_bytes())
865 .field("spilled_rows", &self.spilled_rows())
866 .field("spill_count", &self.spill_count())
867 .finish()
868 }
869}
870
871pub fn sort_batch(
872 batch: &RecordBatch,
873 expressions: &LexOrdering,
874 fetch: Option<usize>,
875) -> Result<RecordBatch> {
876 let sort_columns = expressions
877 .iter()
878 .map(|expr| expr.evaluate_to_sort_column(batch))
879 .collect::<Result<Vec<_>>>()?;
880
881 let indices = lexsort_to_indices(&sort_columns, fetch)?;
882 let columns = take_arrays(batch.columns(), &indices, None)?;
883
884 let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
885 Ok(RecordBatch::try_new_with_options(
886 batch.schema(),
887 columns,
888 &options,
889 )?)
890}
891
892pub fn sort_batch_chunked(
896 batch: &RecordBatch,
897 expressions: &LexOrdering,
898 batch_size: usize,
899) -> Result<Vec<RecordBatch>> {
900 let sort_columns = expressions
901 .iter()
902 .map(|expr| expr.evaluate_to_sort_column(batch))
903 .collect::<Result<Vec<_>>>()?;
904
905 let indices = lexsort_to_indices(&sort_columns, None)?;
906
907 let num_rows = indices.len();
909 let num_chunks = num_rows.div_ceil(batch_size);
910
911 let result_batches = (0..num_chunks)
912 .map(|chunk_idx| {
913 let start = chunk_idx * batch_size;
914 let end = (start + batch_size).min(num_rows);
915 let chunk_len = end - start;
916
917 let chunk_indices = indices.slice(start, chunk_len);
919
920 let columns = take_arrays(batch.columns(), &chunk_indices, None)?;
922
923 let options = RecordBatchOptions::new().with_row_count(Some(chunk_len));
924 let chunk_batch =
925 RecordBatch::try_new_with_options(batch.schema(), columns, &options)?;
926
927 Ok(chunk_batch)
928 })
929 .collect::<Result<Vec<RecordBatch>>>()?;
930
931 Ok(result_batches)
932}
933
934#[derive(Debug, Clone)]
939pub struct SortExec {
940 pub(crate) input: Arc<dyn ExecutionPlan>,
942 expr: LexOrdering,
944 metrics_set: ExecutionPlanMetricsSet,
946 preserve_partitioning: bool,
949 fetch: Option<usize>,
951 common_sort_prefix: Vec<PhysicalSortExpr>,
953 cache: PlanProperties,
955 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
959}
960
961impl SortExec {
962 pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
965 let preserve_partitioning = false;
966 let (cache, sort_prefix) =
967 Self::compute_properties(&input, expr.clone(), preserve_partitioning)
968 .unwrap();
969 Self {
970 expr,
971 input,
972 metrics_set: ExecutionPlanMetricsSet::new(),
973 preserve_partitioning,
974 fetch: None,
975 common_sort_prefix: sort_prefix,
976 cache,
977 filter: None,
978 }
979 }
980
981 pub fn preserve_partitioning(&self) -> bool {
983 self.preserve_partitioning
984 }
985
986 pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
994 self.preserve_partitioning = preserve_partitioning;
995 self.cache = self
996 .cache
997 .with_partitioning(Self::output_partitioning_helper(
998 &self.input,
999 self.preserve_partitioning,
1000 ));
1001 self
1002 }
1003
1004 fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
1006 let children = self
1007 .expr
1008 .iter()
1009 .map(|sort_expr| Arc::clone(&sort_expr.expr))
1010 .collect::<Vec<_>>();
1011 Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1012 DynamicFilterPhysicalExpr::new(children, lit(true)),
1013 ))))
1014 }
1015
1016 fn cloned(&self) -> Self {
1017 SortExec {
1018 input: Arc::clone(&self.input),
1019 expr: self.expr.clone(),
1020 metrics_set: self.metrics_set.clone(),
1021 preserve_partitioning: self.preserve_partitioning,
1022 common_sort_prefix: self.common_sort_prefix.clone(),
1023 fetch: self.fetch,
1024 cache: self.cache.clone(),
1025 filter: self.filter.clone(),
1026 }
1027 }
1028
1029 pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
1037 let mut cache = self.cache.clone();
1038 let is_pipeline_friendly = matches!(
1042 self.cache.emission_type,
1043 EmissionType::Incremental | EmissionType::Both
1044 );
1045 if fetch.is_some() && is_pipeline_friendly {
1046 cache = cache.with_boundedness(Boundedness::Bounded);
1047 }
1048 let filter = fetch.is_some().then(|| {
1049 self.filter.clone().unwrap_or_else(|| self.create_filter())
1051 });
1052 let mut new_sort = self.cloned();
1053 new_sort.fetch = fetch;
1054 new_sort.cache = cache;
1055 new_sort.filter = filter;
1056 new_sort
1057 }
1058
1059 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1061 &self.input
1062 }
1063
1064 pub fn expr(&self) -> &LexOrdering {
1066 &self.expr
1067 }
1068
1069 pub fn fetch(&self) -> Option<usize> {
1071 self.fetch
1072 }
1073
1074 fn output_partitioning_helper(
1075 input: &Arc<dyn ExecutionPlan>,
1076 preserve_partitioning: bool,
1077 ) -> Partitioning {
1078 if preserve_partitioning {
1080 input.output_partitioning().clone()
1081 } else {
1082 Partitioning::UnknownPartitioning(1)
1083 }
1084 }
1085
1086 fn compute_properties(
1089 input: &Arc<dyn ExecutionPlan>,
1090 sort_exprs: LexOrdering,
1091 preserve_partitioning: bool,
1092 ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1093 let (sort_prefix, sort_satisfied) = input
1094 .equivalence_properties()
1095 .extract_common_sort_prefix(sort_exprs.clone())?;
1096
1097 let emission_type = if sort_satisfied {
1101 input.pipeline_behavior()
1102 } else {
1103 EmissionType::Final
1104 };
1105
1106 let boundedness = if sort_satisfied {
1112 input.boundedness()
1113 } else {
1114 match input.boundedness() {
1115 Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1116 requires_infinite_memory: true,
1117 },
1118 bounded => bounded,
1119 }
1120 };
1121
1122 let mut eq_properties = input.equivalence_properties().clone();
1125 eq_properties.reorder(sort_exprs)?;
1126
1127 let output_partitioning =
1129 Self::output_partitioning_helper(input, preserve_partitioning);
1130
1131 Ok((
1132 PlanProperties::new(
1133 eq_properties,
1134 output_partitioning,
1135 emission_type,
1136 boundedness,
1137 ),
1138 sort_prefix,
1139 ))
1140 }
1141}
1142
1143impl DisplayAs for SortExec {
1144 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1145 match t {
1146 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1147 let preserve_partitioning = self.preserve_partitioning;
1148 match self.fetch {
1149 Some(fetch) => {
1150 write!(
1151 f,
1152 "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1153 self.expr
1154 )?;
1155 if let Some(filter) = &self.filter
1156 && let Ok(current) = filter.read().expr().current()
1157 && !current.eq(&lit(true))
1158 {
1159 write!(f, ", filter=[{current}]")?;
1160 }
1161 if !self.common_sort_prefix.is_empty() {
1162 write!(f, ", sort_prefix=[")?;
1163 let mut first = true;
1164 for sort_expr in &self.common_sort_prefix {
1165 if first {
1166 first = false;
1167 } else {
1168 write!(f, ", ")?;
1169 }
1170 write!(f, "{sort_expr}")?;
1171 }
1172 write!(f, "]")
1173 } else {
1174 Ok(())
1175 }
1176 }
1177 None => write!(
1178 f,
1179 "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1180 self.expr
1181 ),
1182 }
1183 }
1184 DisplayFormatType::TreeRender => match self.fetch {
1185 Some(fetch) => {
1186 writeln!(f, "{}", self.expr)?;
1187 writeln!(f, "limit={fetch}")
1188 }
1189 None => {
1190 writeln!(f, "{}", self.expr)
1191 }
1192 },
1193 }
1194 }
1195}
1196
1197impl ExecutionPlan for SortExec {
1198 fn name(&self) -> &'static str {
1199 match self.fetch {
1200 Some(_) => "SortExec(TopK)",
1201 None => "SortExec",
1202 }
1203 }
1204
1205 fn as_any(&self) -> &dyn Any {
1206 self
1207 }
1208
1209 fn properties(&self) -> &PlanProperties {
1210 &self.cache
1211 }
1212
1213 fn required_input_distribution(&self) -> Vec<Distribution> {
1214 if self.preserve_partitioning {
1215 vec![Distribution::UnspecifiedDistribution]
1216 } else {
1217 vec![Distribution::SinglePartition]
1220 }
1221 }
1222
1223 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1224 vec![&self.input]
1225 }
1226
1227 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1228 vec![false]
1229 }
1230
1231 fn with_new_children(
1232 self: Arc<Self>,
1233 children: Vec<Arc<dyn ExecutionPlan>>,
1234 ) -> Result<Arc<dyn ExecutionPlan>> {
1235 let mut new_sort = self.cloned();
1236 assert_eq!(children.len(), 1, "SortExec should have exactly one child");
1237 new_sort.input = Arc::clone(&children[0]);
1238 let (cache, sort_prefix) = Self::compute_properties(
1240 &new_sort.input,
1241 new_sort.expr.clone(),
1242 new_sort.preserve_partitioning,
1243 )?;
1244 new_sort.cache = cache;
1245 new_sort.common_sort_prefix = sort_prefix;
1246
1247 Ok(Arc::new(new_sort))
1248 }
1249
1250 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1251 let children = self.children().into_iter().cloned().collect();
1252 let new_sort = self.with_new_children(children)?;
1253 let mut new_sort = new_sort
1254 .as_any()
1255 .downcast_ref::<SortExec>()
1256 .expect("cloned 1 lines above this line, we know the type")
1257 .clone();
1258 new_sort.filter = Some(new_sort.create_filter());
1260 new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1261
1262 Ok(Arc::new(new_sort))
1263 }
1264
1265 fn execute(
1266 &self,
1267 partition: usize,
1268 context: Arc<TaskContext>,
1269 ) -> Result<SendableRecordBatchStream> {
1270 trace!(
1271 "Start SortExec::execute for partition {} of context session_id {} and task_id {:?}",
1272 partition,
1273 context.session_id(),
1274 context.task_id()
1275 );
1276
1277 let mut input = self.input.execute(partition, Arc::clone(&context))?;
1278
1279 let execution_options = &context.session_config().options().execution;
1280
1281 trace!("End SortExec's input.execute for partition: {partition}");
1282
1283 let sort_satisfied = self
1284 .input
1285 .equivalence_properties()
1286 .ordering_satisfy(self.expr.clone())?;
1287
1288 match (sort_satisfied, self.fetch.as_ref()) {
1289 (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1290 input,
1291 0,
1292 Some(*fetch),
1293 BaselineMetrics::new(&self.metrics_set, partition),
1294 ))),
1295 (true, None) => Ok(input),
1296 (false, Some(fetch)) => {
1297 let filter = self.filter.clone();
1298 let mut topk = TopK::try_new(
1299 partition,
1300 input.schema(),
1301 self.common_sort_prefix.clone(),
1302 self.expr.clone(),
1303 *fetch,
1304 context.session_config().batch_size(),
1305 context.runtime_env(),
1306 &self.metrics_set,
1307 Arc::clone(&unwrap_or_internal_err!(filter)),
1308 )?;
1309 Ok(Box::pin(RecordBatchStreamAdapter::new(
1310 self.schema(),
1311 futures::stream::once(async move {
1312 while let Some(batch) = input.next().await {
1313 let batch = batch?;
1314 topk.insert_batch(batch)?;
1315 if topk.finished {
1316 break;
1317 }
1318 }
1319 topk.emit()
1320 })
1321 .try_flatten(),
1322 )))
1323 }
1324 (false, None) => {
1325 let mut sorter = ExternalSorter::new(
1326 partition,
1327 input.schema(),
1328 self.expr.clone(),
1329 context.session_config().batch_size(),
1330 execution_options.sort_spill_reservation_bytes,
1331 execution_options.sort_in_place_threshold_bytes,
1332 context.session_config().spill_compression(),
1333 &self.metrics_set,
1334 context.runtime_env(),
1335 )?;
1336 Ok(Box::pin(RecordBatchStreamAdapter::new(
1337 self.schema(),
1338 futures::stream::once(async move {
1339 while let Some(batch) = input.next().await {
1340 let batch = batch?;
1341 sorter.insert_batch(batch).await?;
1342 }
1343 sorter.sort().await
1344 })
1345 .try_flatten(),
1346 )))
1347 }
1348 }
1349 }
1350
1351 fn metrics(&self) -> Option<MetricsSet> {
1352 Some(self.metrics_set.clone_inner())
1353 }
1354
1355 fn statistics(&self) -> Result<Statistics> {
1356 self.partition_statistics(None)
1357 }
1358
1359 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1360 if !self.preserve_partitioning() {
1361 return self
1362 .input
1363 .partition_statistics(None)?
1364 .with_fetch(self.fetch, 0, 1);
1365 }
1366 self.input
1367 .partition_statistics(partition)?
1368 .with_fetch(self.fetch, 0, 1)
1369 }
1370
1371 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1372 Some(Arc::new(SortExec::with_fetch(self, limit)))
1373 }
1374
1375 fn fetch(&self) -> Option<usize> {
1376 self.fetch
1377 }
1378
1379 fn cardinality_effect(&self) -> CardinalityEffect {
1380 if self.fetch.is_none() {
1381 CardinalityEffect::Equal
1382 } else {
1383 CardinalityEffect::LowerEqual
1384 }
1385 }
1386
1387 fn try_swapping_with_projection(
1391 &self,
1392 projection: &ProjectionExec,
1393 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1394 if projection.expr().len() >= projection.input().schema().fields().len() {
1396 return Ok(None);
1397 }
1398
1399 let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1400 else {
1401 return Ok(None);
1402 };
1403
1404 Ok(Some(Arc::new(
1405 SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1406 .with_fetch(self.fetch())
1407 .with_preserve_partitioning(self.preserve_partitioning()),
1408 )))
1409 }
1410
1411 fn gather_filters_for_pushdown(
1412 &self,
1413 phase: FilterPushdownPhase,
1414 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1415 config: &datafusion_common::config::ConfigOptions,
1416 ) -> Result<FilterDescription> {
1417 if !matches!(phase, FilterPushdownPhase::Post) {
1418 return FilterDescription::from_children(parent_filters, &self.children());
1419 }
1420
1421 let mut child =
1422 ChildFilterDescription::from_child(&parent_filters, self.input())?;
1423
1424 if let Some(filter) = &self.filter
1425 && config.optimizer.enable_topk_dynamic_filter_pushdown
1426 {
1427 child = child.with_self_filter(filter.read().expr());
1428 }
1429
1430 Ok(FilterDescription::new().with_child(child))
1431 }
1432}
1433
1434#[cfg(test)]
1435mod tests {
1436 use std::collections::HashMap;
1437 use std::pin::Pin;
1438 use std::task::{Context, Poll};
1439
1440 use super::*;
1441 use crate::coalesce_partitions::CoalescePartitionsExec;
1442 use crate::collect;
1443 use crate::execution_plan::Boundedness;
1444 use crate::expressions::col;
1445 use crate::test;
1446 use crate::test::TestMemoryExec;
1447 use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
1448 use crate::test::{assert_is_pending, make_partition};
1449
1450 use arrow::array::*;
1451 use arrow::compute::SortOptions;
1452 use arrow::datatypes::*;
1453 use datafusion_common::cast::as_primitive_array;
1454 use datafusion_common::test_util::batches_to_string;
1455 use datafusion_common::{DataFusionError, Result, ScalarValue};
1456 use datafusion_execution::RecordBatchStream;
1457 use datafusion_execution::config::SessionConfig;
1458 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1459 use datafusion_physical_expr::EquivalenceProperties;
1460 use datafusion_physical_expr::expressions::{Column, Literal};
1461
1462 use futures::{FutureExt, Stream};
1463 use insta::assert_snapshot;
1464
1465 #[derive(Debug, Clone)]
1466 pub struct SortedUnboundedExec {
1467 schema: Schema,
1468 batch_size: u64,
1469 cache: PlanProperties,
1470 }
1471
1472 impl DisplayAs for SortedUnboundedExec {
1473 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1474 match t {
1475 DisplayFormatType::Default
1476 | DisplayFormatType::Verbose
1477 | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1478 }
1479 Ok(())
1480 }
1481 }
1482
1483 impl SortedUnboundedExec {
1484 fn compute_properties(schema: SchemaRef) -> PlanProperties {
1485 let mut eq_properties = EquivalenceProperties::new(schema);
1486 eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1487 Column::new("c1", 0),
1488 ))]);
1489 PlanProperties::new(
1490 eq_properties,
1491 Partitioning::UnknownPartitioning(1),
1492 EmissionType::Final,
1493 Boundedness::Unbounded {
1494 requires_infinite_memory: false,
1495 },
1496 )
1497 }
1498 }
1499
1500 impl ExecutionPlan for SortedUnboundedExec {
1501 fn name(&self) -> &'static str {
1502 Self::static_name()
1503 }
1504
1505 fn as_any(&self) -> &dyn Any {
1506 self
1507 }
1508
1509 fn properties(&self) -> &PlanProperties {
1510 &self.cache
1511 }
1512
1513 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1514 vec![]
1515 }
1516
1517 fn with_new_children(
1518 self: Arc<Self>,
1519 _: Vec<Arc<dyn ExecutionPlan>>,
1520 ) -> Result<Arc<dyn ExecutionPlan>> {
1521 Ok(self)
1522 }
1523
1524 fn execute(
1525 &self,
1526 _partition: usize,
1527 _context: Arc<TaskContext>,
1528 ) -> Result<SendableRecordBatchStream> {
1529 Ok(Box::pin(SortedUnboundedStream {
1530 schema: Arc::new(self.schema.clone()),
1531 batch_size: self.batch_size,
1532 offset: 0,
1533 }))
1534 }
1535 }
1536
1537 #[derive(Debug)]
1538 pub struct SortedUnboundedStream {
1539 schema: SchemaRef,
1540 batch_size: u64,
1541 offset: u64,
1542 }
1543
1544 impl Stream for SortedUnboundedStream {
1545 type Item = Result<RecordBatch>;
1546
1547 fn poll_next(
1548 mut self: Pin<&mut Self>,
1549 _cx: &mut Context<'_>,
1550 ) -> Poll<Option<Self::Item>> {
1551 let batch = SortedUnboundedStream::create_record_batch(
1552 Arc::clone(&self.schema),
1553 self.offset,
1554 self.batch_size,
1555 );
1556 self.offset += self.batch_size;
1557 Poll::Ready(Some(Ok(batch)))
1558 }
1559 }
1560
1561 impl RecordBatchStream for SortedUnboundedStream {
1562 fn schema(&self) -> SchemaRef {
1563 Arc::clone(&self.schema)
1564 }
1565 }
1566
1567 impl SortedUnboundedStream {
1568 fn create_record_batch(
1569 schema: SchemaRef,
1570 offset: u64,
1571 batch_size: u64,
1572 ) -> RecordBatch {
1573 let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1574 let array = UInt64Array::from(values);
1575 let array_ref: ArrayRef = Arc::new(array);
1576 RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1577 }
1578 }
1579
1580 #[tokio::test]
1581 async fn test_in_mem_sort() -> Result<()> {
1582 let task_ctx = Arc::new(TaskContext::default());
1583 let partitions = 4;
1584 let csv = test::scan_partitioned(partitions);
1585 let schema = csv.schema();
1586
1587 let sort_exec = Arc::new(SortExec::new(
1588 [PhysicalSortExpr {
1589 expr: col("i", &schema)?,
1590 options: SortOptions::default(),
1591 }]
1592 .into(),
1593 Arc::new(CoalescePartitionsExec::new(csv)),
1594 ));
1595
1596 let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1597
1598 assert_eq!(result.len(), 1);
1599 assert_eq!(result[0].num_rows(), 400);
1600 assert_eq!(
1601 task_ctx.runtime_env().memory_pool.reserved(),
1602 0,
1603 "The sort should have returned all memory used back to the memory manager"
1604 );
1605
1606 Ok(())
1607 }
1608
1609 #[tokio::test]
1610 async fn test_sort_spill() -> Result<()> {
1611 let session_config = SessionConfig::new();
1613 let sort_spill_reservation_bytes = session_config
1614 .options()
1615 .execution
1616 .sort_spill_reservation_bytes;
1617 let runtime = RuntimeEnvBuilder::new()
1618 .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1619 .build_arc()?;
1620 let task_ctx = Arc::new(
1621 TaskContext::default()
1622 .with_session_config(session_config)
1623 .with_runtime(runtime),
1624 );
1625
1626 let partitions = 100;
1630 let input = test::scan_partitioned(partitions);
1631 let schema = input.schema();
1632
1633 let sort_exec = Arc::new(SortExec::new(
1634 [PhysicalSortExpr {
1635 expr: col("i", &schema)?,
1636 options: SortOptions::default(),
1637 }]
1638 .into(),
1639 Arc::new(CoalescePartitionsExec::new(input)),
1640 ));
1641
1642 let result = collect(
1643 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1644 Arc::clone(&task_ctx),
1645 )
1646 .await?;
1647
1648 assert_eq!(result.len(), 2);
1649
1650 let metrics = sort_exec.metrics().unwrap();
1652
1653 assert_eq!(metrics.output_rows().unwrap(), 10000);
1654 assert!(metrics.elapsed_compute().unwrap() > 0);
1655
1656 let spill_count = metrics.spill_count().unwrap();
1657 let spilled_rows = metrics.spilled_rows().unwrap();
1658 let spilled_bytes = metrics.spilled_bytes().unwrap();
1659 assert!((3..=10).contains(&spill_count));
1663 assert!((9000..=10000).contains(&spilled_rows));
1664 assert!((38000..=44000).contains(&spilled_bytes));
1665
1666 let columns = result[0].columns();
1667
1668 let i = as_primitive_array::<Int32Type>(&columns[0])?;
1669 assert_eq!(i.value(0), 0);
1670 assert_eq!(i.value(i.len() - 1), 81);
1671 assert_eq!(
1672 task_ctx.runtime_env().memory_pool.reserved(),
1673 0,
1674 "The sort should have returned all memory used back to the memory manager"
1675 );
1676
1677 Ok(())
1678 }
1679
1680 #[tokio::test]
1681 async fn test_batch_reservation_error() -> Result<()> {
1682 let merge_reservation: usize = 0; let session_config =
1686 SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1687
1688 let plan = test::scan_partitioned(1);
1689
1690 let expected_batch_reservation = {
1692 let temp_ctx = Arc::new(TaskContext::default());
1693 let mut stream = plan.execute(0, Arc::clone(&temp_ctx))?;
1694 let first_batch = stream.next().await.unwrap()?;
1695 get_reserved_bytes_for_record_batch(&first_batch)?
1696 };
1697
1698 let memory_limit: usize = expected_batch_reservation + merge_reservation - 1;
1700
1701 let runtime = RuntimeEnvBuilder::new()
1702 .with_memory_limit(memory_limit, 1.0)
1703 .build_arc()?;
1704 let task_ctx = Arc::new(
1705 TaskContext::default()
1706 .with_session_config(session_config)
1707 .with_runtime(runtime),
1708 );
1709
1710 {
1712 let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1713 let first_batch = stream.next().await.unwrap()?;
1714 let batch_reservation = get_reserved_bytes_for_record_batch(&first_batch)?;
1715
1716 assert_eq!(batch_reservation, expected_batch_reservation);
1717 assert!(memory_limit < (merge_reservation + batch_reservation));
1718 }
1719
1720 let sort_exec = Arc::new(SortExec::new(
1721 [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1722 plan,
1723 ));
1724
1725 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1726
1727 let err = result.unwrap_err();
1728 assert!(
1729 matches!(err, DataFusionError::Context(..)),
1730 "Assertion failed: expected a Context error, but got: {err:?}"
1731 );
1732
1733 assert!(
1735 matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1736 "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1737 );
1738
1739 Ok(())
1740 }
1741
1742 #[tokio::test]
1743 async fn test_sort_spill_utf8_strings() -> Result<()> {
1744 let session_config = SessionConfig::new()
1745 .with_batch_size(100)
1746 .with_sort_in_place_threshold_bytes(20 * 1024)
1747 .with_sort_spill_reservation_bytes(100 * 1024);
1748 let runtime = RuntimeEnvBuilder::new()
1749 .with_memory_limit(500 * 1024, 1.0)
1750 .build_arc()?;
1751 let task_ctx = Arc::new(
1752 TaskContext::default()
1753 .with_session_config(session_config)
1754 .with_runtime(runtime),
1755 );
1756
1757 let input = test::scan_partitioned_utf8(200);
1761 let schema = input.schema();
1762
1763 let sort_exec = Arc::new(SortExec::new(
1764 [PhysicalSortExpr {
1765 expr: col("i", &schema)?,
1766 options: SortOptions::default(),
1767 }]
1768 .into(),
1769 Arc::new(CoalescePartitionsExec::new(input)),
1770 ));
1771
1772 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1773
1774 let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1775 assert_eq!(num_rows, 20000);
1776
1777 let metrics = sort_exec.metrics().unwrap();
1779
1780 assert_eq!(metrics.output_rows().unwrap(), 20000);
1781 assert!(metrics.elapsed_compute().unwrap() > 0);
1782
1783 let spill_count = metrics.spill_count().unwrap();
1784 let spilled_rows = metrics.spilled_rows().unwrap();
1785 let spilled_bytes = metrics.spilled_bytes().unwrap();
1786
1787 assert!((4..=8).contains(&spill_count));
1801 assert!((15000..=20000).contains(&spilled_rows));
1802 assert!((900000..=1000000).contains(&spilled_bytes));
1803
1804 let concated_result = concat_batches(&schema, &result)?;
1806 let columns = concated_result.columns();
1807 let string_array = as_string_array(&columns[0]);
1808 for i in 0..string_array.len() - 1 {
1809 assert!(string_array.value(i) <= string_array.value(i + 1));
1810 }
1811
1812 assert_eq!(
1813 task_ctx.runtime_env().memory_pool.reserved(),
1814 0,
1815 "The sort should have returned all memory used back to the memory manager"
1816 );
1817
1818 Ok(())
1819 }
1820
1821 #[tokio::test]
1822 async fn test_sort_fetch_memory_calculation() -> Result<()> {
1823 let avg_batch_size = 400;
1825 let partitions = 4;
1826
1827 let test_options = vec![
1829 (None, true),
1832 (Some(1), false),
1835 ];
1836
1837 for (fetch, expect_spillage) in test_options {
1838 let session_config = SessionConfig::new();
1839 let sort_spill_reservation_bytes = session_config
1840 .options()
1841 .execution
1842 .sort_spill_reservation_bytes;
1843
1844 let runtime = RuntimeEnvBuilder::new()
1845 .with_memory_limit(
1846 sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1847 1.0,
1848 )
1849 .build_arc()?;
1850 let task_ctx = Arc::new(
1851 TaskContext::default()
1852 .with_runtime(runtime)
1853 .with_session_config(session_config),
1854 );
1855
1856 let csv = test::scan_partitioned(partitions);
1857 let schema = csv.schema();
1858
1859 let sort_exec = Arc::new(
1860 SortExec::new(
1861 [PhysicalSortExpr {
1862 expr: col("i", &schema)?,
1863 options: SortOptions::default(),
1864 }]
1865 .into(),
1866 Arc::new(CoalescePartitionsExec::new(csv)),
1867 )
1868 .with_fetch(fetch),
1869 );
1870
1871 let result =
1872 collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1873 assert_eq!(result.len(), 1);
1874
1875 let metrics = sort_exec.metrics().unwrap();
1876 let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1877 assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1878 }
1879 Ok(())
1880 }
1881
1882 #[tokio::test]
1883 async fn test_sort_memory_reduction_per_batch() -> Result<()> {
1884 let batch_size = 50; let num_rows = 1000; let task_ctx = Arc::new(
1893 TaskContext::default().with_session_config(
1894 SessionConfig::new()
1895 .with_batch_size(batch_size)
1896 .with_sort_in_place_threshold_bytes(usize::MAX), ),
1898 );
1899
1900 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1901
1902 let mut values: Vec<i32> = (0..num_rows).collect();
1904 values.reverse();
1905
1906 let input_batch = RecordBatch::try_new(
1907 Arc::clone(&schema),
1908 vec![Arc::new(Int32Array::from(values))],
1909 )?;
1910
1911 let batches = vec![input_batch];
1912
1913 let sort_exec = Arc::new(SortExec::new(
1914 [PhysicalSortExpr {
1915 expr: Arc::new(Column::new("a", 0)),
1916 options: SortOptions::default(),
1917 }]
1918 .into(),
1919 TestMemoryExec::try_new_exec(
1920 std::slice::from_ref(&batches),
1921 Arc::clone(&schema),
1922 None,
1923 )?,
1924 ));
1925
1926 let mut stream = sort_exec.execute(0, Arc::clone(&task_ctx))?;
1927
1928 let mut previous_reserved = task_ctx.runtime_env().memory_pool.reserved();
1929 let mut batch_count = 0;
1930
1931 while let Some(result) = stream.next().await {
1933 let batch = result?;
1934 batch_count += 1;
1935
1936 assert!(batch.num_rows() > 0, "Batch should not be empty");
1938
1939 let current_reserved = task_ctx.runtime_env().memory_pool.reserved();
1940
1941 if batch_count > 1 {
1944 assert!(
1945 current_reserved <= previous_reserved,
1946 "Memory reservation should decrease or stay same as batches are emitted. \
1947 Batch {batch_count}: previous={previous_reserved}, current={current_reserved}"
1948 );
1949 }
1950
1951 previous_reserved = current_reserved;
1952 }
1953
1954 assert!(
1955 batch_count > 1,
1956 "Expected multiple batches to be emitted, got {batch_count}"
1957 );
1958
1959 assert_eq!(
1961 task_ctx.runtime_env().memory_pool.reserved(),
1962 0,
1963 "All memory should be returned after consuming all batches"
1964 );
1965
1966 Ok(())
1967 }
1968
1969 #[tokio::test]
1970 async fn test_sort_metadata() -> Result<()> {
1971 let task_ctx = Arc::new(TaskContext::default());
1972 let field_metadata: HashMap<String, String> =
1973 vec![("foo".to_string(), "bar".to_string())]
1974 .into_iter()
1975 .collect();
1976 let schema_metadata: HashMap<String, String> =
1977 vec![("baz".to_string(), "barf".to_string())]
1978 .into_iter()
1979 .collect();
1980
1981 let mut field = Field::new("field_name", DataType::UInt64, true);
1982 field.set_metadata(field_metadata.clone());
1983 let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1984 let schema = Arc::new(schema);
1985
1986 let data: ArrayRef =
1987 Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1988
1989 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1990 let input =
1991 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1992
1993 let sort_exec = Arc::new(SortExec::new(
1994 [PhysicalSortExpr {
1995 expr: col("field_name", &schema)?,
1996 options: SortOptions::default(),
1997 }]
1998 .into(),
1999 input,
2000 ));
2001
2002 let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
2003
2004 let expected_data: ArrayRef =
2005 Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
2006 let expected_batch =
2007 RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
2008
2009 assert_eq!(&vec![expected_batch], &result);
2011
2012 assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
2014 assert_eq!(result[0].schema().metadata(), &schema_metadata);
2015
2016 Ok(())
2017 }
2018
2019 #[tokio::test]
2020 async fn test_lex_sort_by_mixed_types() -> Result<()> {
2021 let task_ctx = Arc::new(TaskContext::default());
2022 let schema = Arc::new(Schema::new(vec![
2023 Field::new("a", DataType::Int32, true),
2024 Field::new(
2025 "b",
2026 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2027 true,
2028 ),
2029 ]));
2030
2031 let batch = RecordBatch::try_new(
2033 Arc::clone(&schema),
2034 vec![
2035 Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
2036 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2037 Some(vec![Some(3)]),
2038 Some(vec![Some(1)]),
2039 Some(vec![Some(6), None]),
2040 Some(vec![Some(5)]),
2041 ])),
2042 ],
2043 )?;
2044
2045 let sort_exec = Arc::new(SortExec::new(
2046 [
2047 PhysicalSortExpr {
2048 expr: col("a", &schema)?,
2049 options: SortOptions {
2050 descending: false,
2051 nulls_first: true,
2052 },
2053 },
2054 PhysicalSortExpr {
2055 expr: col("b", &schema)?,
2056 options: SortOptions {
2057 descending: true,
2058 nulls_first: false,
2059 },
2060 },
2061 ]
2062 .into(),
2063 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
2064 ));
2065
2066 assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
2067 assert_eq!(
2068 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2069 *sort_exec.schema().field(1).data_type()
2070 );
2071
2072 let result: Vec<RecordBatch> =
2073 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2074 let metrics = sort_exec.metrics().unwrap();
2075 assert!(metrics.elapsed_compute().unwrap() > 0);
2076 assert_eq!(metrics.output_rows().unwrap(), 4);
2077 assert_eq!(result.len(), 1);
2078
2079 let expected = RecordBatch::try_new(
2080 schema,
2081 vec![
2082 Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
2083 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2084 Some(vec![Some(1)]),
2085 Some(vec![Some(6), None]),
2086 Some(vec![Some(5)]),
2087 Some(vec![Some(3)]),
2088 ])),
2089 ],
2090 )?;
2091
2092 assert_eq!(expected, result[0]);
2093
2094 Ok(())
2095 }
2096
2097 #[tokio::test]
2098 async fn test_lex_sort_by_float() -> Result<()> {
2099 let task_ctx = Arc::new(TaskContext::default());
2100 let schema = Arc::new(Schema::new(vec![
2101 Field::new("a", DataType::Float32, true),
2102 Field::new("b", DataType::Float64, true),
2103 ]));
2104
2105 let batch = RecordBatch::try_new(
2107 Arc::clone(&schema),
2108 vec![
2109 Arc::new(Float32Array::from(vec![
2110 Some(f32::NAN),
2111 None,
2112 None,
2113 Some(f32::NAN),
2114 Some(1.0_f32),
2115 Some(1.0_f32),
2116 Some(2.0_f32),
2117 Some(3.0_f32),
2118 ])),
2119 Arc::new(Float64Array::from(vec![
2120 Some(200.0_f64),
2121 Some(20.0_f64),
2122 Some(10.0_f64),
2123 Some(100.0_f64),
2124 Some(f64::NAN),
2125 None,
2126 None,
2127 Some(f64::NAN),
2128 ])),
2129 ],
2130 )?;
2131
2132 let sort_exec = Arc::new(SortExec::new(
2133 [
2134 PhysicalSortExpr {
2135 expr: col("a", &schema)?,
2136 options: SortOptions {
2137 descending: true,
2138 nulls_first: true,
2139 },
2140 },
2141 PhysicalSortExpr {
2142 expr: col("b", &schema)?,
2143 options: SortOptions {
2144 descending: false,
2145 nulls_first: false,
2146 },
2147 },
2148 ]
2149 .into(),
2150 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
2151 ));
2152
2153 assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
2154 assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
2155
2156 let result: Vec<RecordBatch> =
2157 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2158 let metrics = sort_exec.metrics().unwrap();
2159 assert!(metrics.elapsed_compute().unwrap() > 0);
2160 assert_eq!(metrics.output_rows().unwrap(), 8);
2161 assert_eq!(result.len(), 1);
2162
2163 let columns = result[0].columns();
2164
2165 assert_eq!(DataType::Float32, *columns[0].data_type());
2166 assert_eq!(DataType::Float64, *columns[1].data_type());
2167
2168 let a = as_primitive_array::<Float32Type>(&columns[0])?;
2169 let b = as_primitive_array::<Float64Type>(&columns[1])?;
2170
2171 let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2173 .map(|i| {
2174 let aval = if a.is_valid(i) {
2175 Some(a.value(i).to_string())
2176 } else {
2177 None
2178 };
2179 let bval = if b.is_valid(i) {
2180 Some(b.value(i).to_string())
2181 } else {
2182 None
2183 };
2184 (aval, bval)
2185 })
2186 .collect();
2187
2188 let expected: Vec<(Option<String>, Option<String>)> = vec![
2189 (None, Some("10".to_owned())),
2190 (None, Some("20".to_owned())),
2191 (Some("NaN".to_owned()), Some("100".to_owned())),
2192 (Some("NaN".to_owned()), Some("200".to_owned())),
2193 (Some("3".to_owned()), Some("NaN".to_owned())),
2194 (Some("2".to_owned()), None),
2195 (Some("1".to_owned()), Some("NaN".to_owned())),
2196 (Some("1".to_owned()), None),
2197 ];
2198
2199 assert_eq!(expected, result);
2200
2201 Ok(())
2202 }
2203
2204 #[tokio::test]
2205 async fn test_drop_cancel() -> Result<()> {
2206 let task_ctx = Arc::new(TaskContext::default());
2207 let schema =
2208 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2209
2210 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2211 let refs = blocking_exec.refs();
2212 let sort_exec = Arc::new(SortExec::new(
2213 [PhysicalSortExpr {
2214 expr: col("a", &schema)?,
2215 options: SortOptions::default(),
2216 }]
2217 .into(),
2218 blocking_exec,
2219 ));
2220
2221 let fut = collect(sort_exec, Arc::clone(&task_ctx));
2222 let mut fut = fut.boxed();
2223
2224 assert_is_pending(&mut fut);
2225 drop(fut);
2226 assert_strong_count_converges_to_zero(refs).await;
2227
2228 assert_eq!(
2229 task_ctx.runtime_env().memory_pool.reserved(),
2230 0,
2231 "The sort should have returned all memory used back to the memory manager"
2232 );
2233
2234 Ok(())
2235 }
2236
2237 #[test]
2238 fn test_empty_sort_batch() {
2239 let schema = Arc::new(Schema::empty());
2240 let options = RecordBatchOptions::new().with_row_count(Some(1));
2241 let batch =
2242 RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2243 .unwrap();
2244
2245 let expressions = [PhysicalSortExpr {
2246 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2247 options: SortOptions::default(),
2248 }]
2249 .into();
2250
2251 let result = sort_batch(&batch, &expressions, None).unwrap();
2252 assert_eq!(result.num_rows(), 1);
2253 }
2254
2255 #[tokio::test]
2256 async fn topk_unbounded_source() -> Result<()> {
2257 let task_ctx = Arc::new(TaskContext::default());
2258 let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2259 let source = SortedUnboundedExec {
2260 schema: schema.clone(),
2261 batch_size: 2,
2262 cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
2263 };
2264 let mut plan = SortExec::new(
2265 [PhysicalSortExpr::new_default(Arc::new(Column::new(
2266 "c1", 0,
2267 )))]
2268 .into(),
2269 Arc::new(source),
2270 );
2271 plan = plan.with_fetch(Some(9));
2272
2273 let batches = collect(Arc::new(plan), task_ctx).await?;
2274 assert_snapshot!(batches_to_string(&batches), @r"
2275 +----+
2276 | c1 |
2277 +----+
2278 | 0 |
2279 | 1 |
2280 | 2 |
2281 | 3 |
2282 | 4 |
2283 | 5 |
2284 | 6 |
2285 | 7 |
2286 | 8 |
2287 +----+
2288 ");
2289 Ok(())
2290 }
2291
2292 #[tokio::test]
2293 async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2294 let batch_size = 100;
2295
2296 let create_task_ctx = |_: &[RecordBatch]| {
2297 TaskContext::default().with_session_config(
2298 SessionConfig::new()
2299 .with_batch_size(batch_size)
2300 .with_sort_in_place_threshold_bytes(usize::MAX),
2301 )
2302 };
2303
2304 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2306
2307 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2309
2310 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2312
2313 Ok(())
2314 }
2315
2316 #[tokio::test]
2317 async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place()
2318 -> Result<()> {
2319 let batch_size = 100;
2320
2321 let create_task_ctx = |_: &[RecordBatch]| {
2322 TaskContext::default().with_session_config(
2323 SessionConfig::new()
2324 .with_batch_size(batch_size)
2325 .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2326 )
2327 };
2328
2329 {
2331 let metrics =
2332 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2333
2334 assert_eq!(
2335 metrics.spill_count(),
2336 Some(0),
2337 "Expected no spills when sorting in place"
2338 );
2339 }
2340
2341 {
2343 let metrics =
2344 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2345
2346 assert_eq!(
2347 metrics.spill_count(),
2348 Some(0),
2349 "Expected no spills when sorting in place"
2350 );
2351 }
2352
2353 {
2355 let metrics =
2356 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2357
2358 assert_eq!(
2359 metrics.spill_count(),
2360 Some(0),
2361 "Expected no spills when sorting in place"
2362 );
2363 }
2364
2365 Ok(())
2366 }
2367
2368 #[tokio::test]
2369 async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch()
2370 -> Result<()> {
2371 let batch_size = 100;
2372
2373 let create_task_ctx = |_: &[RecordBatch]| {
2374 TaskContext::default()
2375 .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2376 };
2377
2378 {
2380 let metrics = test_sort_output_batch_size(
2381 1,
2383 batch_size / 4,
2384 create_task_ctx,
2385 )
2386 .await?;
2387
2388 assert_eq!(
2389 metrics.spill_count(),
2390 Some(0),
2391 "Expected no spills when sorting in place"
2392 );
2393 }
2394
2395 {
2397 let metrics = test_sort_output_batch_size(
2398 1,
2400 batch_size + 7,
2401 create_task_ctx,
2402 )
2403 .await?;
2404
2405 assert_eq!(
2406 metrics.spill_count(),
2407 Some(0),
2408 "Expected no spills when sorting in place"
2409 );
2410 }
2411
2412 {
2414 let metrics = test_sort_output_batch_size(
2415 1,
2417 batch_size * 3,
2418 create_task_ctx,
2419 )
2420 .await?;
2421
2422 assert_eq!(
2423 metrics.spill_count(),
2424 Some(0),
2425 "Expected no spills when sorting in place"
2426 );
2427 }
2428
2429 Ok(())
2430 }
2431
2432 #[tokio::test]
2433 async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill()
2434 -> Result<()> {
2435 let batch_size = 100;
2436
2437 let create_task_ctx = |generated_batches: &[RecordBatch]| {
2438 let batches_memory = generated_batches
2439 .iter()
2440 .map(|b| b.get_array_memory_size())
2441 .sum::<usize>();
2442
2443 TaskContext::default()
2444 .with_session_config(
2445 SessionConfig::new()
2446 .with_batch_size(batch_size)
2447 .with_sort_in_place_threshold_bytes(1)
2449 .with_sort_spill_reservation_bytes(1),
2450 )
2451 .with_runtime(
2452 RuntimeEnvBuilder::default()
2453 .with_memory_limit(batches_memory, 1.0)
2454 .build_arc()
2455 .unwrap(),
2456 )
2457 };
2458
2459 {
2461 let metrics =
2462 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2463
2464 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2465 }
2466
2467 {
2469 let metrics =
2470 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2471
2472 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2473 }
2474
2475 {
2477 let metrics =
2478 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2479
2480 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2481 }
2482
2483 Ok(())
2484 }
2485
2486 async fn test_sort_output_batch_size(
2487 number_of_batches: usize,
2488 batch_size_to_generate: usize,
2489 create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2490 ) -> Result<MetricsSet> {
2491 let batches = (0..number_of_batches)
2492 .map(|_| make_partition(batch_size_to_generate as i32))
2493 .collect::<Vec<_>>();
2494 let task_ctx = create_task_ctx(batches.as_slice());
2495
2496 let expected_batch_size = task_ctx.session_config().batch_size();
2497
2498 let (mut output_batches, metrics) =
2499 run_sort_on_input(task_ctx, "i", batches).await?;
2500
2501 let last_batch = output_batches.pop().unwrap();
2502
2503 for batch in output_batches {
2504 assert_eq!(batch.num_rows(), expected_batch_size);
2505 }
2506
2507 let mut last_expected_batch_size =
2508 (batch_size_to_generate * number_of_batches) % expected_batch_size;
2509 if last_expected_batch_size == 0 {
2510 last_expected_batch_size = expected_batch_size;
2511 }
2512 assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2513
2514 Ok(metrics)
2515 }
2516
2517 async fn run_sort_on_input(
2518 task_ctx: TaskContext,
2519 order_by_col: &str,
2520 batches: Vec<RecordBatch>,
2521 ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2522 let task_ctx = Arc::new(task_ctx);
2523
2524 let schema = batches[0].schema();
2526 let ordering: LexOrdering = [PhysicalSortExpr {
2527 expr: col(order_by_col, &schema)?,
2528 options: SortOptions {
2529 descending: false,
2530 nulls_first: true,
2531 },
2532 }]
2533 .into();
2534 let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2535 ordering.clone(),
2536 TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2537 ));
2538
2539 let sorted_batches =
2540 collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2541
2542 let metrics = sort_exec.metrics().expect("sort have metrics");
2543
2544 {
2546 let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2547 let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2548
2549 let sorted_batches_concat =
2550 concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2551
2552 assert_eq!(sorted_input_batch, sorted_batches_concat);
2553 }
2554
2555 Ok((sorted_batches, metrics))
2556 }
2557
2558 #[tokio::test]
2559 async fn test_sort_batch_chunked_basic() -> Result<()> {
2560 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2561
2562 let mut values: Vec<i32> = (0..1000).collect();
2564 values.reverse();
2566
2567 let batch = RecordBatch::try_new(
2568 Arc::clone(&schema),
2569 vec![Arc::new(Int32Array::from(values))],
2570 )?;
2571
2572 let expressions: LexOrdering =
2573 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2574
2575 let result_batches = sort_batch_chunked(&batch, &expressions, 250)?;
2577
2578 assert_eq!(result_batches.len(), 4);
2580
2581 let mut total_rows = 0;
2583 for (i, batch) in result_batches.iter().enumerate() {
2584 assert!(
2585 batch.num_rows() <= 250,
2586 "Batch {} has {} rows, expected <= 250",
2587 i,
2588 batch.num_rows()
2589 );
2590 total_rows += batch.num_rows();
2591 }
2592
2593 assert_eq!(total_rows, 1000);
2595
2596 let concatenated = concat_batches(&schema, &result_batches)?;
2598 let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2599 for i in 0..array.len() - 1 {
2600 assert!(
2601 array.value(i) <= array.value(i + 1),
2602 "Array not sorted at position {}: {} > {}",
2603 i,
2604 array.value(i),
2605 array.value(i + 1)
2606 );
2607 }
2608 assert_eq!(array.value(0), 0);
2609 assert_eq!(array.value(array.len() - 1), 999);
2610
2611 Ok(())
2612 }
2613
2614 #[tokio::test]
2615 async fn test_sort_batch_chunked_smaller_than_batch_size() -> Result<()> {
2616 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2617
2618 let values: Vec<i32> = (0..50).rev().collect();
2620 let batch = RecordBatch::try_new(
2621 Arc::clone(&schema),
2622 vec![Arc::new(Int32Array::from(values))],
2623 )?;
2624
2625 let expressions: LexOrdering =
2626 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2627
2628 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2630
2631 assert_eq!(result_batches.len(), 1);
2633 assert_eq!(result_batches[0].num_rows(), 50);
2634
2635 let array = as_primitive_array::<Int32Type>(result_batches[0].column(0))?;
2637 for i in 0..array.len() - 1 {
2638 assert!(array.value(i) <= array.value(i + 1));
2639 }
2640 assert_eq!(array.value(0), 0);
2641 assert_eq!(array.value(49), 49);
2642
2643 Ok(())
2644 }
2645
2646 #[tokio::test]
2647 async fn test_sort_batch_chunked_exact_multiple() -> Result<()> {
2648 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2649
2650 let values: Vec<i32> = (0..1000).rev().collect();
2652 let batch = RecordBatch::try_new(
2653 Arc::clone(&schema),
2654 vec![Arc::new(Int32Array::from(values))],
2655 )?;
2656
2657 let expressions: LexOrdering =
2658 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2659
2660 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2662
2663 assert_eq!(result_batches.len(), 10);
2665 for batch in &result_batches {
2666 assert_eq!(batch.num_rows(), 100);
2667 }
2668
2669 let concatenated = concat_batches(&schema, &result_batches)?;
2671 let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2672 for i in 0..array.len() - 1 {
2673 assert!(array.value(i) <= array.value(i + 1));
2674 }
2675
2676 Ok(())
2677 }
2678
2679 #[tokio::test]
2680 async fn test_sort_batch_chunked_empty_batch() -> Result<()> {
2681 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2682
2683 let batch = RecordBatch::new_empty(Arc::clone(&schema));
2684
2685 let expressions: LexOrdering =
2686 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2687
2688 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2689
2690 assert_eq!(result_batches.len(), 0);
2692
2693 Ok(())
2694 }
2695
2696 #[tokio::test]
2697 async fn test_get_reserved_bytes_for_record_batch_with_sliced_batches() -> Result<()>
2698 {
2699 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2700
2701 let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
2703 let sliced_array = large_array.slice(100, 50); let sliced_batch =
2706 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(sliced_array)])?;
2707 let batch =
2708 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(large_array)])?;
2709
2710 let sliced_reserved = get_reserved_bytes_for_record_batch(&sliced_batch)?;
2711 let reserved = get_reserved_bytes_for_record_batch(&batch)?;
2712
2713 assert!(reserved > sliced_reserved);
2715
2716 Ok(())
2717 }
2718}