1use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{
31 Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
32};
33use crate::expressions::PhysicalSortExpr;
34use crate::filter_pushdown::{
35 ChildFilterDescription, FilterDescription, FilterPushdownPhase,
36};
37use crate::limit::LimitStream;
38use crate::metrics::{
39 BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
40};
41use crate::projection::{ProjectionExec, make_with_child, update_ordering};
42use crate::sorts::IncrementalSortIterator;
43use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
44use crate::spill::get_record_batch_memory_size;
45use crate::spill::in_progress_spill_file::InProgressSpillFile;
46use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
47use crate::stream::RecordBatchStreamAdapter;
48use crate::stream::ReservationStream;
49use crate::topk::TopK;
50use crate::topk::TopKDynamicFilters;
51use crate::{
52 DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
53 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
54 Statistics,
55};
56
57use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
58use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
59use arrow::datatypes::SchemaRef;
60use datafusion_common::config::SpillCompression;
61use datafusion_common::{
62 DataFusionError, Result, assert_or_internal_err, internal_datafusion_err,
63 unwrap_or_internal_err,
64};
65use datafusion_execution::TaskContext;
66use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
67use datafusion_execution::runtime_env::RuntimeEnv;
68use datafusion_physical_expr::LexOrdering;
69use datafusion_physical_expr::PhysicalExpr;
70use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
71
72use futures::{StreamExt, TryStreamExt};
73use log::{debug, trace};
74
75struct ExternalSorterMetrics {
76 baseline: BaselineMetrics,
78
79 spill_metrics: SpillMetrics,
80}
81
82impl ExternalSorterMetrics {
83 fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
84 Self {
85 baseline: BaselineMetrics::new(metrics, partition),
86 spill_metrics: SpillMetrics::new(metrics, partition),
87 }
88 }
89}
90
91struct ExternalSorter {
209 schema: SchemaRef,
215 expr: LexOrdering,
217 batch_size: usize,
219 sort_in_place_threshold_bytes: usize,
223
224 in_mem_batches: Vec<RecordBatch>,
230
231 in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
238 finished_spill_files: Vec<SortedSpillFile>,
243
244 metrics: ExternalSorterMetrics,
250 runtime: Arc<RuntimeEnv>,
252 reservation: MemoryReservation,
254 spill_manager: SpillManager,
255
256 merge_reservation: MemoryReservation,
260 sort_spill_reservation_bytes: usize,
263}
264
265impl ExternalSorter {
266 #[expect(clippy::too_many_arguments)]
269 pub fn new(
270 partition_id: usize,
271 schema: SchemaRef,
272 expr: LexOrdering,
273 batch_size: usize,
274 sort_spill_reservation_bytes: usize,
275 sort_in_place_threshold_bytes: usize,
276 spill_compression: SpillCompression,
278 metrics: &ExecutionPlanMetricsSet,
279 runtime: Arc<RuntimeEnv>,
280 ) -> Result<Self> {
281 let metrics = ExternalSorterMetrics::new(metrics, partition_id);
282 let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
283 .with_can_spill(true)
284 .register(&runtime.memory_pool);
285
286 let merge_reservation =
287 MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
288 .register(&runtime.memory_pool);
289
290 let spill_manager = SpillManager::new(
291 Arc::clone(&runtime),
292 metrics.spill_metrics.clone(),
293 Arc::clone(&schema),
294 )
295 .with_compression_type(spill_compression);
296
297 Ok(Self {
298 schema,
299 in_mem_batches: vec![],
300 in_progress_spill_file: None,
301 finished_spill_files: vec![],
302 expr,
303 metrics,
304 reservation,
305 spill_manager,
306 merge_reservation,
307 runtime,
308 batch_size,
309 sort_spill_reservation_bytes,
310 sort_in_place_threshold_bytes,
311 })
312 }
313
314 async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
318 if input.num_rows() == 0 {
319 return Ok(());
320 }
321
322 self.reserve_memory_for_merge()?;
323 self.reserve_memory_for_batch_and_maybe_spill(&input)
324 .await?;
325
326 self.in_mem_batches.push(input);
327 Ok(())
328 }
329
330 fn spilled_before(&self) -> bool {
331 !self.finished_spill_files.is_empty()
332 }
333
334 async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
344 self.merge_reservation.free();
348
349 if self.spilled_before() {
350 if !self.in_mem_batches.is_empty() {
354 self.sort_and_spill_in_mem_batches().await?;
355 }
356
357 StreamingMergeBuilder::new()
358 .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
359 .with_spill_manager(self.spill_manager.clone())
360 .with_schema(Arc::clone(&self.schema))
361 .with_expressions(&self.expr.clone())
362 .with_metrics(self.metrics.baseline.clone())
363 .with_batch_size(self.batch_size)
364 .with_fetch(None)
365 .with_reservation(self.merge_reservation.new_empty())
366 .build()
367 } else {
368 self.in_mem_sort_stream(self.metrics.baseline.clone())
369 }
370 }
371
372 fn used(&self) -> usize {
374 self.reservation.size()
375 }
376
377 fn spilled_bytes(&self) -> usize {
379 self.metrics.spill_metrics.spilled_bytes.value()
380 }
381
382 fn spilled_rows(&self) -> usize {
384 self.metrics.spill_metrics.spilled_rows.value()
385 }
386
387 fn spill_count(&self) -> usize {
389 self.metrics.spill_metrics.spill_file_count.value()
390 }
391
392 async fn consume_and_spill_append(
395 &mut self,
396 globally_sorted_batches: &mut Vec<RecordBatch>,
397 ) -> Result<()> {
398 if globally_sorted_batches.is_empty() {
399 return Ok(());
400 }
401
402 if self.in_progress_spill_file.is_none() {
404 self.in_progress_spill_file =
405 Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
406 }
407
408 Self::organize_stringview_arrays(globally_sorted_batches)?;
409
410 debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
411
412 let batches_to_spill = std::mem::take(globally_sorted_batches);
413 self.reservation.free();
414
415 let (in_progress_file, max_record_batch_size) =
416 self.in_progress_spill_file.as_mut().ok_or_else(|| {
417 internal_datafusion_err!("In-progress spill file should be initialized")
418 })?;
419
420 for batch in batches_to_spill {
421 in_progress_file.append_batch(&batch)?;
422
423 *max_record_batch_size =
424 (*max_record_batch_size).max(batch.get_sliced_size()?);
425 }
426
427 assert_or_internal_err!(
428 globally_sorted_batches.is_empty(),
429 "This function consumes globally_sorted_batches, so it should be empty after taking."
430 );
431
432 Ok(())
433 }
434
435 async fn spill_finish(&mut self) -> Result<()> {
437 let (mut in_progress_file, max_record_batch_memory) =
438 self.in_progress_spill_file.take().ok_or_else(|| {
439 internal_datafusion_err!("Should be called after `spill_append`")
440 })?;
441 let spill_file = in_progress_file.finish()?;
442
443 if let Some(spill_file) = spill_file {
444 self.finished_spill_files.push(SortedSpillFile {
445 file: spill_file,
446 max_record_batch_memory,
447 });
448 }
449
450 Ok(())
451 }
452
453 fn organize_stringview_arrays(
483 globally_sorted_batches: &mut Vec<RecordBatch>,
484 ) -> Result<()> {
485 let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
486
487 for batch in globally_sorted_batches.drain(..) {
488 let mut new_columns: Vec<Arc<dyn Array>> =
489 Vec::with_capacity(batch.num_columns());
490
491 let mut arr_mutated = false;
492 for array in batch.columns() {
493 if let Some(string_view_array) =
494 array.as_any().downcast_ref::<StringViewArray>()
495 {
496 let new_array = string_view_array.gc();
497 new_columns.push(Arc::new(new_array));
498 arr_mutated = true;
499 } else {
500 new_columns.push(Arc::clone(array));
501 }
502 }
503
504 let organized_batch = if arr_mutated {
505 RecordBatch::try_new(batch.schema(), new_columns)?
506 } else {
507 batch
508 };
509
510 organized_batches.push(organized_batch);
511 }
512
513 *globally_sorted_batches = organized_batches;
514
515 Ok(())
516 }
517
518 async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
521 assert_or_internal_err!(
522 !self.in_mem_batches.is_empty(),
523 "in_mem_batches must not be empty when attempting to sort and spill"
524 );
525
526 self.merge_reservation.free();
531
532 let mut sorted_stream =
533 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
534 assert_or_internal_err!(
537 self.in_mem_batches.is_empty(),
538 "in_mem_batches should be empty after constructing sorted stream"
539 );
540 let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
544
545 while let Some(batch) = sorted_stream.next().await {
546 let batch = batch?;
547 let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
548 if self.reservation.try_grow(sorted_size).is_err() {
549 globally_sorted_batches.push(batch);
553 self.consume_and_spill_append(&mut globally_sorted_batches)
554 .await?; } else {
556 globally_sorted_batches.push(batch);
557 }
558 }
559
560 drop(sorted_stream);
563
564 self.consume_and_spill_append(&mut globally_sorted_batches)
565 .await?;
566 self.spill_finish().await?;
567
568 let buffers_cleared_property =
570 self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
571 assert_or_internal_err!(
572 buffers_cleared_property,
573 "in_mem_batches and globally_sorted_batches should be cleared before"
574 );
575
576 self.reserve_memory_for_merge()?;
578
579 Ok(())
580 }
581
582 fn in_mem_sort_stream(
641 &mut self,
642 metrics: BaselineMetrics,
643 ) -> Result<SendableRecordBatchStream> {
644 if self.in_mem_batches.is_empty() {
645 return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
646 &self.schema,
647 ))));
648 }
649
650 let elapsed_compute = metrics.elapsed_compute().clone();
653 let _timer = elapsed_compute.timer();
654
655 if self.in_mem_batches.len() == 1 {
662 let batch = self.in_mem_batches.swap_remove(0);
663 let reservation = self.reservation.take();
664 return self.sort_batch_stream(batch, &metrics, reservation);
665 }
666
667 if self.reservation.size() < self.sort_in_place_threshold_bytes {
669 let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
671 self.in_mem_batches.clear();
672 self.reservation
673 .try_resize(get_reserved_bytes_for_record_batch(&batch)?)
674 .map_err(Self::err_with_oom_context)?;
675 let reservation = self.reservation.take();
676 return self.sort_batch_stream(batch, &metrics, reservation);
677 }
678
679 let streams = std::mem::take(&mut self.in_mem_batches)
680 .into_iter()
681 .map(|batch| {
682 let metrics = self.metrics.baseline.intermediate();
683 let reservation = self
684 .reservation
685 .split(get_reserved_bytes_for_record_batch(&batch)?);
686 let input = self.sort_batch_stream(batch, &metrics, reservation)?;
687 Ok(spawn_buffered(input, 1))
688 })
689 .collect::<Result<_>>()?;
690
691 StreamingMergeBuilder::new()
692 .with_streams(streams)
693 .with_schema(Arc::clone(&self.schema))
694 .with_expressions(&self.expr.clone())
695 .with_metrics(metrics)
696 .with_batch_size(self.batch_size)
697 .with_fetch(None)
698 .with_reservation(self.merge_reservation.new_empty())
699 .build()
700 }
701
702 fn sort_batch_stream(
712 &self,
713 batch: RecordBatch,
714 metrics: &BaselineMetrics,
715 reservation: MemoryReservation,
716 ) -> Result<SendableRecordBatchStream> {
717 assert_eq!(
718 get_reserved_bytes_for_record_batch(&batch)?,
719 reservation.size()
720 );
721
722 let schema = batch.schema();
723 let expressions = self.expr.clone();
724 let batch_size = self.batch_size;
725 let output_row_metrics = metrics.output_rows().clone();
726
727 let stream = futures::stream::once(async move {
728 let schema = batch.schema();
729
730 let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
732
733 let total_sorted_size: usize = sorted_batches
738 .iter()
739 .map(get_record_batch_memory_size)
740 .sum();
741 reservation
742 .try_resize(total_sorted_size)
743 .map_err(Self::err_with_oom_context)?;
744
745 Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
747 Arc::clone(&schema),
748 Box::pin(RecordBatchStreamAdapter::new(
749 Arc::clone(&schema),
750 futures::stream::iter(sorted_batches.into_iter().map(Ok)),
751 )),
752 reservation,
753 )) as SendableRecordBatchStream)
754 })
755 .try_flatten()
756 .map(move |batch| match batch {
757 Ok(batch) => {
758 output_row_metrics.add(batch.num_rows());
759 Ok(batch)
760 }
761 Err(e) => Err(e),
762 });
763
764 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
765 }
766
767 fn reserve_memory_for_merge(&mut self) -> Result<()> {
771 if self.runtime.disk_manager.tmp_files_enabled() {
773 let size = self.sort_spill_reservation_bytes;
774 if self.merge_reservation.size() != size {
775 self.merge_reservation
776 .try_resize(size)
777 .map_err(Self::err_with_oom_context)?;
778 }
779 }
780
781 Ok(())
782 }
783
784 async fn reserve_memory_for_batch_and_maybe_spill(
787 &mut self,
788 input: &RecordBatch,
789 ) -> Result<()> {
790 let size = get_reserved_bytes_for_record_batch(input)?;
791
792 match self.reservation.try_grow(size) {
793 Ok(_) => Ok(()),
794 Err(e) => {
795 if self.in_mem_batches.is_empty() {
796 return Err(Self::err_with_oom_context(e));
797 }
798
799 self.sort_and_spill_in_mem_batches().await?;
801 self.reservation
802 .try_grow(size)
803 .map_err(Self::err_with_oom_context)
804 }
805 }
806 }
807
808 fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
811 match e {
812 DataFusionError::ResourcesExhausted(_) => e.context(
813 "Not enough memory to continue external sort. \
814 Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', \
815 or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'."
816 ),
817 _ => e,
819 }
820 }
821}
822
823pub(crate) fn get_reserved_bytes_for_record_batch_size(
834 record_batch_size: usize,
835 sliced_size: usize,
836) -> usize {
837 record_batch_size + sliced_size
841}
842
843pub(crate) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result<usize> {
847 batch.get_sliced_size().map(|sliced_size| {
848 get_reserved_bytes_for_record_batch_size(
849 get_record_batch_memory_size(batch),
850 sliced_size,
851 )
852 })
853}
854
855impl Debug for ExternalSorter {
856 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
857 f.debug_struct("ExternalSorter")
858 .field("memory_used", &self.used())
859 .field("spilled_bytes", &self.spilled_bytes())
860 .field("spilled_rows", &self.spilled_rows())
861 .field("spill_count", &self.spill_count())
862 .finish()
863 }
864}
865
866pub fn sort_batch(
867 batch: &RecordBatch,
868 expressions: &LexOrdering,
869 fetch: Option<usize>,
870) -> Result<RecordBatch> {
871 let sort_columns = expressions
872 .iter()
873 .map(|expr| expr.evaluate_to_sort_column(batch))
874 .collect::<Result<Vec<_>>>()?;
875
876 let indices = lexsort_to_indices(&sort_columns, fetch)?;
877 let columns = take_arrays(batch.columns(), &indices, None)?;
878
879 let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
880 Ok(RecordBatch::try_new_with_options(
881 batch.schema(),
882 columns,
883 &options,
884 )?)
885}
886
887pub fn sort_batch_chunked(
891 batch: &RecordBatch,
892 expressions: &LexOrdering,
893 batch_size: usize,
894) -> Result<Vec<RecordBatch>> {
895 IncrementalSortIterator::new(batch.clone(), expressions.clone(), batch_size).collect()
896}
897
898#[derive(Debug, Clone)]
903pub struct SortExec {
904 pub(crate) input: Arc<dyn ExecutionPlan>,
906 expr: LexOrdering,
908 metrics_set: ExecutionPlanMetricsSet,
910 preserve_partitioning: bool,
913 fetch: Option<usize>,
915 common_sort_prefix: Vec<PhysicalSortExpr>,
917 cache: Arc<PlanProperties>,
919 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
923}
924
925impl SortExec {
926 pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
929 let preserve_partitioning = false;
930 let (cache, sort_prefix) =
931 Self::compute_properties(&input, expr.clone(), preserve_partitioning)
932 .unwrap();
933 Self {
934 expr,
935 input,
936 metrics_set: ExecutionPlanMetricsSet::new(),
937 preserve_partitioning,
938 fetch: None,
939 common_sort_prefix: sort_prefix,
940 cache: Arc::new(cache),
941 filter: None,
942 }
943 }
944
945 pub fn preserve_partitioning(&self) -> bool {
947 self.preserve_partitioning
948 }
949
950 pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
958 self.preserve_partitioning = preserve_partitioning;
959 Arc::make_mut(&mut self.cache).partitioning =
960 Self::output_partitioning_helper(&self.input, self.preserve_partitioning);
961 self
962 }
963
964 fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
966 let children = self
967 .expr
968 .iter()
969 .map(|sort_expr| Arc::clone(&sort_expr.expr))
970 .collect::<Vec<_>>();
971 Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
972 DynamicFilterPhysicalExpr::new(children, lit(true)),
973 ))))
974 }
975
976 fn cloned(&self) -> Self {
977 SortExec {
978 input: Arc::clone(&self.input),
979 expr: self.expr.clone(),
980 metrics_set: self.metrics_set.clone(),
981 preserve_partitioning: self.preserve_partitioning,
982 common_sort_prefix: self.common_sort_prefix.clone(),
983 fetch: self.fetch,
984 cache: Arc::clone(&self.cache),
985 filter: self.filter.clone(),
986 }
987 }
988
989 pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
997 let mut cache = PlanProperties::clone(&self.cache);
998 let is_pipeline_friendly = matches!(
1002 cache.emission_type,
1003 EmissionType::Incremental | EmissionType::Both
1004 );
1005 if fetch.is_some() && is_pipeline_friendly {
1006 cache = cache.with_boundedness(Boundedness::Bounded);
1007 }
1008 let filter = fetch.is_some().then(|| {
1009 self.filter.clone().unwrap_or_else(|| self.create_filter())
1011 });
1012 let mut new_sort = self.cloned();
1013 new_sort.fetch = fetch;
1014 new_sort.cache = cache.into();
1015 new_sort.filter = filter;
1016 new_sort
1017 }
1018
1019 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1021 &self.input
1022 }
1023
1024 pub fn expr(&self) -> &LexOrdering {
1026 &self.expr
1027 }
1028
1029 pub fn fetch(&self) -> Option<usize> {
1031 self.fetch
1032 }
1033
1034 fn output_partitioning_helper(
1035 input: &Arc<dyn ExecutionPlan>,
1036 preserve_partitioning: bool,
1037 ) -> Partitioning {
1038 if preserve_partitioning {
1040 input.output_partitioning().clone()
1041 } else {
1042 Partitioning::UnknownPartitioning(1)
1043 }
1044 }
1045
1046 fn compute_properties(
1049 input: &Arc<dyn ExecutionPlan>,
1050 sort_exprs: LexOrdering,
1051 preserve_partitioning: bool,
1052 ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1053 let (sort_prefix, sort_satisfied) = input
1054 .equivalence_properties()
1055 .extract_common_sort_prefix(sort_exprs.clone())?;
1056
1057 let emission_type = if sort_satisfied {
1061 input.pipeline_behavior()
1062 } else {
1063 EmissionType::Final
1064 };
1065
1066 let boundedness = if sort_satisfied {
1072 input.boundedness()
1073 } else {
1074 match input.boundedness() {
1075 Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1076 requires_infinite_memory: true,
1077 },
1078 bounded => bounded,
1079 }
1080 };
1081
1082 let mut eq_properties = input.equivalence_properties().clone();
1085 eq_properties.reorder(sort_exprs)?;
1086
1087 let output_partitioning =
1089 Self::output_partitioning_helper(input, preserve_partitioning);
1090
1091 Ok((
1092 PlanProperties::new(
1093 eq_properties,
1094 output_partitioning,
1095 emission_type,
1096 boundedness,
1097 ),
1098 sort_prefix,
1099 ))
1100 }
1101}
1102
1103impl DisplayAs for SortExec {
1104 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1105 match t {
1106 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1107 let preserve_partitioning = self.preserve_partitioning;
1108 match self.fetch {
1109 Some(fetch) => {
1110 write!(
1111 f,
1112 "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1113 self.expr
1114 )?;
1115 if let Some(filter) = &self.filter
1116 && let Ok(current) = filter.read().expr().current()
1117 && !current.eq(&lit(true))
1118 {
1119 write!(f, ", filter=[{current}]")?;
1120 }
1121 if !self.common_sort_prefix.is_empty() {
1122 write!(f, ", sort_prefix=[")?;
1123 let mut first = true;
1124 for sort_expr in &self.common_sort_prefix {
1125 if first {
1126 first = false;
1127 } else {
1128 write!(f, ", ")?;
1129 }
1130 write!(f, "{sort_expr}")?;
1131 }
1132 write!(f, "]")
1133 } else {
1134 Ok(())
1135 }
1136 }
1137 None => write!(
1138 f,
1139 "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1140 self.expr
1141 ),
1142 }
1143 }
1144 DisplayFormatType::TreeRender => match self.fetch {
1145 Some(fetch) => {
1146 writeln!(f, "{}", self.expr)?;
1147 writeln!(f, "limit={fetch}")
1148 }
1149 None => {
1150 writeln!(f, "{}", self.expr)
1151 }
1152 },
1153 }
1154 }
1155}
1156
1157impl ExecutionPlan for SortExec {
1158 fn name(&self) -> &'static str {
1159 match self.fetch {
1160 Some(_) => "SortExec(TopK)",
1161 None => "SortExec",
1162 }
1163 }
1164
1165 fn as_any(&self) -> &dyn Any {
1166 self
1167 }
1168
1169 fn properties(&self) -> &Arc<PlanProperties> {
1170 &self.cache
1171 }
1172
1173 fn required_input_distribution(&self) -> Vec<Distribution> {
1174 if self.preserve_partitioning {
1175 vec![Distribution::UnspecifiedDistribution]
1176 } else {
1177 vec![Distribution::SinglePartition]
1180 }
1181 }
1182
1183 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1184 vec![&self.input]
1185 }
1186
1187 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1188 vec![false]
1189 }
1190
1191 fn with_new_children(
1192 self: Arc<Self>,
1193 children: Vec<Arc<dyn ExecutionPlan>>,
1194 ) -> Result<Arc<dyn ExecutionPlan>> {
1195 let mut new_sort = self.cloned();
1196 assert_eq!(children.len(), 1, "SortExec should have exactly one child");
1197 new_sort.input = Arc::clone(&children[0]);
1198
1199 if !has_same_children_properties(self.as_ref(), &children)? {
1200 let (cache, sort_prefix) = Self::compute_properties(
1202 &new_sort.input,
1203 new_sort.expr.clone(),
1204 new_sort.preserve_partitioning,
1205 )?;
1206 new_sort.cache = Arc::new(cache);
1207 new_sort.common_sort_prefix = sort_prefix;
1208 }
1209
1210 Ok(Arc::new(new_sort))
1211 }
1212
1213 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1214 let children = self.children().into_iter().cloned().collect();
1215 let new_sort = self.with_new_children(children)?;
1216 let mut new_sort = new_sort
1217 .as_any()
1218 .downcast_ref::<SortExec>()
1219 .expect("cloned 1 lines above this line, we know the type")
1220 .clone();
1221 new_sort.filter = Some(new_sort.create_filter());
1223 new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1224
1225 Ok(Arc::new(new_sort))
1226 }
1227
1228 fn execute(
1229 &self,
1230 partition: usize,
1231 context: Arc<TaskContext>,
1232 ) -> Result<SendableRecordBatchStream> {
1233 trace!(
1234 "Start SortExec::execute for partition {} of context session_id {} and task_id {:?}",
1235 partition,
1236 context.session_id(),
1237 context.task_id()
1238 );
1239
1240 let mut input = self.input.execute(partition, Arc::clone(&context))?;
1241
1242 let execution_options = &context.session_config().options().execution;
1243
1244 trace!("End SortExec's input.execute for partition: {partition}");
1245
1246 let sort_satisfied = self
1247 .input
1248 .equivalence_properties()
1249 .ordering_satisfy(self.expr.clone())?;
1250
1251 match (sort_satisfied, self.fetch.as_ref()) {
1252 (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1253 input,
1254 0,
1255 Some(*fetch),
1256 BaselineMetrics::new(&self.metrics_set, partition),
1257 ))),
1258 (true, None) => Ok(input),
1259 (false, Some(fetch)) => {
1260 let filter = self.filter.clone();
1261 let mut topk = TopK::try_new(
1262 partition,
1263 input.schema(),
1264 self.common_sort_prefix.clone(),
1265 self.expr.clone(),
1266 *fetch,
1267 context.session_config().batch_size(),
1268 context.runtime_env(),
1269 &self.metrics_set,
1270 Arc::clone(&unwrap_or_internal_err!(filter)),
1271 )?;
1272 Ok(Box::pin(RecordBatchStreamAdapter::new(
1273 self.schema(),
1274 futures::stream::once(async move {
1275 while let Some(batch) = input.next().await {
1276 let batch = batch?;
1277 topk.insert_batch(batch)?;
1278 if topk.finished {
1279 break;
1280 }
1281 }
1282 topk.emit()
1283 })
1284 .try_flatten(),
1285 )))
1286 }
1287 (false, None) => {
1288 let mut sorter = ExternalSorter::new(
1289 partition,
1290 input.schema(),
1291 self.expr.clone(),
1292 context.session_config().batch_size(),
1293 execution_options.sort_spill_reservation_bytes,
1294 execution_options.sort_in_place_threshold_bytes,
1295 context.session_config().spill_compression(),
1296 &self.metrics_set,
1297 context.runtime_env(),
1298 )?;
1299 Ok(Box::pin(RecordBatchStreamAdapter::new(
1300 self.schema(),
1301 futures::stream::once(async move {
1302 while let Some(batch) = input.next().await {
1303 let batch = batch?;
1304 sorter.insert_batch(batch).await?;
1305 }
1306 sorter.sort().await
1307 })
1308 .try_flatten(),
1309 )))
1310 }
1311 }
1312 }
1313
1314 fn metrics(&self) -> Option<MetricsSet> {
1315 Some(self.metrics_set.clone_inner())
1316 }
1317
1318 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1319 if !self.preserve_partitioning() {
1320 return self
1321 .input
1322 .partition_statistics(None)?
1323 .with_fetch(self.fetch, 0, 1);
1324 }
1325 self.input
1326 .partition_statistics(partition)?
1327 .with_fetch(self.fetch, 0, 1)
1328 }
1329
1330 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1331 Some(Arc::new(SortExec::with_fetch(self, limit)))
1332 }
1333
1334 fn fetch(&self) -> Option<usize> {
1335 self.fetch
1336 }
1337
1338 fn cardinality_effect(&self) -> CardinalityEffect {
1339 if self.fetch.is_none() {
1340 CardinalityEffect::Equal
1341 } else {
1342 CardinalityEffect::LowerEqual
1343 }
1344 }
1345
1346 fn try_swapping_with_projection(
1350 &self,
1351 projection: &ProjectionExec,
1352 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1353 if projection.expr().len() >= projection.input().schema().fields().len() {
1355 return Ok(None);
1356 }
1357
1358 let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1359 else {
1360 return Ok(None);
1361 };
1362
1363 Ok(Some(Arc::new(
1364 SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1365 .with_fetch(self.fetch())
1366 .with_preserve_partitioning(self.preserve_partitioning()),
1367 )))
1368 }
1369
1370 fn gather_filters_for_pushdown(
1371 &self,
1372 phase: FilterPushdownPhase,
1373 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1374 config: &datafusion_common::config::ConfigOptions,
1375 ) -> Result<FilterDescription> {
1376 if phase != FilterPushdownPhase::Post {
1377 return FilterDescription::from_children(parent_filters, &self.children());
1378 }
1379
1380 let mut child =
1381 ChildFilterDescription::from_child(&parent_filters, self.input())?;
1382
1383 if let Some(filter) = &self.filter
1384 && config.optimizer.enable_topk_dynamic_filter_pushdown
1385 {
1386 child = child.with_self_filter(filter.read().expr());
1387 }
1388
1389 Ok(FilterDescription::new().with_child(child))
1390 }
1391}
1392
1393#[cfg(test)]
1394mod tests {
1395 use std::collections::HashMap;
1396 use std::pin::Pin;
1397 use std::task::{Context, Poll};
1398
1399 use super::*;
1400 use crate::coalesce_partitions::CoalescePartitionsExec;
1401 use crate::collect;
1402 use crate::execution_plan::Boundedness;
1403 use crate::expressions::col;
1404 use crate::test;
1405 use crate::test::TestMemoryExec;
1406 use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
1407 use crate::test::{assert_is_pending, make_partition};
1408
1409 use arrow::array::*;
1410 use arrow::compute::SortOptions;
1411 use arrow::datatypes::*;
1412 use datafusion_common::cast::as_primitive_array;
1413 use datafusion_common::test_util::batches_to_string;
1414 use datafusion_common::{DataFusionError, Result, ScalarValue};
1415 use datafusion_execution::RecordBatchStream;
1416 use datafusion_execution::config::SessionConfig;
1417 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1418 use datafusion_physical_expr::EquivalenceProperties;
1419 use datafusion_physical_expr::expressions::{Column, Literal};
1420
1421 use futures::{FutureExt, Stream};
1422 use insta::assert_snapshot;
1423
1424 #[derive(Debug, Clone)]
1425 pub struct SortedUnboundedExec {
1426 schema: Schema,
1427 batch_size: u64,
1428 cache: Arc<PlanProperties>,
1429 }
1430
1431 impl DisplayAs for SortedUnboundedExec {
1432 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1433 match t {
1434 DisplayFormatType::Default
1435 | DisplayFormatType::Verbose
1436 | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1437 }
1438 Ok(())
1439 }
1440 }
1441
1442 impl SortedUnboundedExec {
1443 fn compute_properties(schema: SchemaRef) -> PlanProperties {
1444 let mut eq_properties = EquivalenceProperties::new(schema);
1445 eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1446 Column::new("c1", 0),
1447 ))]);
1448 PlanProperties::new(
1449 eq_properties,
1450 Partitioning::UnknownPartitioning(1),
1451 EmissionType::Final,
1452 Boundedness::Unbounded {
1453 requires_infinite_memory: false,
1454 },
1455 )
1456 }
1457 }
1458
1459 impl ExecutionPlan for SortedUnboundedExec {
1460 fn name(&self) -> &'static str {
1461 Self::static_name()
1462 }
1463
1464 fn as_any(&self) -> &dyn Any {
1465 self
1466 }
1467
1468 fn properties(&self) -> &Arc<PlanProperties> {
1469 &self.cache
1470 }
1471
1472 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1473 vec![]
1474 }
1475
1476 fn with_new_children(
1477 self: Arc<Self>,
1478 _: Vec<Arc<dyn ExecutionPlan>>,
1479 ) -> Result<Arc<dyn ExecutionPlan>> {
1480 Ok(self)
1481 }
1482
1483 fn execute(
1484 &self,
1485 _partition: usize,
1486 _context: Arc<TaskContext>,
1487 ) -> Result<SendableRecordBatchStream> {
1488 Ok(Box::pin(SortedUnboundedStream {
1489 schema: Arc::new(self.schema.clone()),
1490 batch_size: self.batch_size,
1491 offset: 0,
1492 }))
1493 }
1494 }
1495
1496 #[derive(Debug)]
1497 pub struct SortedUnboundedStream {
1498 schema: SchemaRef,
1499 batch_size: u64,
1500 offset: u64,
1501 }
1502
1503 impl Stream for SortedUnboundedStream {
1504 type Item = Result<RecordBatch>;
1505
1506 fn poll_next(
1507 mut self: Pin<&mut Self>,
1508 _cx: &mut Context<'_>,
1509 ) -> Poll<Option<Self::Item>> {
1510 let batch = SortedUnboundedStream::create_record_batch(
1511 Arc::clone(&self.schema),
1512 self.offset,
1513 self.batch_size,
1514 );
1515 self.offset += self.batch_size;
1516 Poll::Ready(Some(Ok(batch)))
1517 }
1518 }
1519
1520 impl RecordBatchStream for SortedUnboundedStream {
1521 fn schema(&self) -> SchemaRef {
1522 Arc::clone(&self.schema)
1523 }
1524 }
1525
1526 impl SortedUnboundedStream {
1527 fn create_record_batch(
1528 schema: SchemaRef,
1529 offset: u64,
1530 batch_size: u64,
1531 ) -> RecordBatch {
1532 let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1533 let array = UInt64Array::from(values);
1534 let array_ref: ArrayRef = Arc::new(array);
1535 RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1536 }
1537 }
1538
1539 #[tokio::test]
1540 async fn test_in_mem_sort() -> Result<()> {
1541 let task_ctx = Arc::new(TaskContext::default());
1542 let partitions = 4;
1543 let csv = test::scan_partitioned(partitions);
1544 let schema = csv.schema();
1545
1546 let sort_exec = Arc::new(SortExec::new(
1547 [PhysicalSortExpr {
1548 expr: col("i", &schema)?,
1549 options: SortOptions::default(),
1550 }]
1551 .into(),
1552 Arc::new(CoalescePartitionsExec::new(csv)),
1553 ));
1554
1555 let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1556
1557 assert_eq!(result.len(), 1);
1558 assert_eq!(result[0].num_rows(), 400);
1559 assert_eq!(
1560 task_ctx.runtime_env().memory_pool.reserved(),
1561 0,
1562 "The sort should have returned all memory used back to the memory manager"
1563 );
1564
1565 Ok(())
1566 }
1567
1568 #[tokio::test]
1569 async fn test_sort_spill() -> Result<()> {
1570 let session_config = SessionConfig::new();
1572 let sort_spill_reservation_bytes = session_config
1573 .options()
1574 .execution
1575 .sort_spill_reservation_bytes;
1576 let runtime = RuntimeEnvBuilder::new()
1577 .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1578 .build_arc()?;
1579 let task_ctx = Arc::new(
1580 TaskContext::default()
1581 .with_session_config(session_config)
1582 .with_runtime(runtime),
1583 );
1584
1585 let partitions = 100;
1589 let input = test::scan_partitioned(partitions);
1590 let schema = input.schema();
1591
1592 let sort_exec = Arc::new(SortExec::new(
1593 [PhysicalSortExpr {
1594 expr: col("i", &schema)?,
1595 options: SortOptions::default(),
1596 }]
1597 .into(),
1598 Arc::new(CoalescePartitionsExec::new(input)),
1599 ));
1600
1601 let result = collect(
1602 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1603 Arc::clone(&task_ctx),
1604 )
1605 .await?;
1606
1607 assert_eq!(result.len(), 2);
1608
1609 let metrics = sort_exec.metrics().unwrap();
1611
1612 assert_eq!(metrics.output_rows().unwrap(), 10000);
1613 assert!(metrics.elapsed_compute().unwrap() > 0);
1614
1615 let spill_count = metrics.spill_count().unwrap();
1616 let spilled_rows = metrics.spilled_rows().unwrap();
1617 let spilled_bytes = metrics.spilled_bytes().unwrap();
1618 assert!((3..=10).contains(&spill_count));
1622 assert!((9000..=10000).contains(&spilled_rows));
1623 assert!((38000..=44000).contains(&spilled_bytes));
1624
1625 let columns = result[0].columns();
1626
1627 let i = as_primitive_array::<Int32Type>(&columns[0])?;
1628 assert_eq!(i.value(0), 0);
1629 assert_eq!(i.value(i.len() - 1), 81);
1630 assert_eq!(
1631 task_ctx.runtime_env().memory_pool.reserved(),
1632 0,
1633 "The sort should have returned all memory used back to the memory manager"
1634 );
1635
1636 Ok(())
1637 }
1638
1639 #[tokio::test]
1640 async fn test_batch_reservation_error() -> Result<()> {
1641 let merge_reservation: usize = 0; let session_config =
1645 SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1646
1647 let plan = test::scan_partitioned(1);
1648
1649 let expected_batch_reservation = {
1651 let temp_ctx = Arc::new(TaskContext::default());
1652 let mut stream = plan.execute(0, Arc::clone(&temp_ctx))?;
1653 let first_batch = stream.next().await.unwrap()?;
1654 get_reserved_bytes_for_record_batch(&first_batch)?
1655 };
1656
1657 let memory_limit: usize = expected_batch_reservation + merge_reservation - 1;
1659
1660 let runtime = RuntimeEnvBuilder::new()
1661 .with_memory_limit(memory_limit, 1.0)
1662 .build_arc()?;
1663 let task_ctx = Arc::new(
1664 TaskContext::default()
1665 .with_session_config(session_config)
1666 .with_runtime(runtime),
1667 );
1668
1669 {
1671 let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1672 let first_batch = stream.next().await.unwrap()?;
1673 let batch_reservation = get_reserved_bytes_for_record_batch(&first_batch)?;
1674
1675 assert_eq!(batch_reservation, expected_batch_reservation);
1676 assert!(memory_limit < (merge_reservation + batch_reservation));
1677 }
1678
1679 let sort_exec = Arc::new(SortExec::new(
1680 [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1681 plan,
1682 ));
1683
1684 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1685
1686 let err = result.unwrap_err();
1687 assert!(
1688 matches!(err, DataFusionError::Context(..)),
1689 "Assertion failed: expected a Context error, but got: {err:?}"
1690 );
1691
1692 assert!(
1694 matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1695 "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1696 );
1697
1698 let config_vector = vec![
1700 "datafusion.runtime.memory_limit",
1701 "datafusion.execution.sort_spill_reservation_bytes",
1702 ];
1703 let error_message = err.message().to_string();
1704 for config in config_vector.into_iter() {
1705 assert!(
1706 error_message.as_str().contains(config),
1707 "Config: '{}' should be contained in error message: {}.",
1708 config,
1709 error_message.as_str()
1710 );
1711 }
1712
1713 Ok(())
1714 }
1715
1716 #[tokio::test]
1717 async fn test_sort_spill_utf8_strings() -> Result<()> {
1718 let session_config = SessionConfig::new()
1719 .with_batch_size(100)
1720 .with_sort_in_place_threshold_bytes(20 * 1024)
1721 .with_sort_spill_reservation_bytes(100 * 1024);
1722 let runtime = RuntimeEnvBuilder::new()
1723 .with_memory_limit(500 * 1024, 1.0)
1724 .build_arc()?;
1725 let task_ctx = Arc::new(
1726 TaskContext::default()
1727 .with_session_config(session_config)
1728 .with_runtime(runtime),
1729 );
1730
1731 let input = test::scan_partitioned_utf8(200);
1735 let schema = input.schema();
1736
1737 let sort_exec = Arc::new(SortExec::new(
1738 [PhysicalSortExpr {
1739 expr: col("i", &schema)?,
1740 options: SortOptions::default(),
1741 }]
1742 .into(),
1743 Arc::new(CoalescePartitionsExec::new(input)),
1744 ));
1745
1746 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1747
1748 let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1749 assert_eq!(num_rows, 20000);
1750
1751 let metrics = sort_exec.metrics().unwrap();
1753
1754 assert_eq!(metrics.output_rows().unwrap(), 20000);
1755 assert!(metrics.elapsed_compute().unwrap() > 0);
1756
1757 let spill_count = metrics.spill_count().unwrap();
1758 let spilled_rows = metrics.spilled_rows().unwrap();
1759 let spilled_bytes = metrics.spilled_bytes().unwrap();
1760
1761 assert!((4..=8).contains(&spill_count));
1775 assert!((15000..=20000).contains(&spilled_rows));
1776 assert!((900000..=1000000).contains(&spilled_bytes));
1777
1778 let concated_result = concat_batches(&schema, &result)?;
1780 let columns = concated_result.columns();
1781 let string_array = as_string_array(&columns[0]);
1782 for i in 0..string_array.len() - 1 {
1783 assert!(string_array.value(i) <= string_array.value(i + 1));
1784 }
1785
1786 assert_eq!(
1787 task_ctx.runtime_env().memory_pool.reserved(),
1788 0,
1789 "The sort should have returned all memory used back to the memory manager"
1790 );
1791
1792 Ok(())
1793 }
1794
1795 #[tokio::test]
1796 async fn test_sort_fetch_memory_calculation() -> Result<()> {
1797 let avg_batch_size = 400;
1799 let partitions = 4;
1800
1801 let test_options = vec![
1803 (None, true),
1806 (Some(1), false),
1809 ];
1810
1811 for (fetch, expect_spillage) in test_options {
1812 let session_config = SessionConfig::new();
1813 let sort_spill_reservation_bytes = session_config
1814 .options()
1815 .execution
1816 .sort_spill_reservation_bytes;
1817
1818 let runtime = RuntimeEnvBuilder::new()
1819 .with_memory_limit(
1820 sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1821 1.0,
1822 )
1823 .build_arc()?;
1824 let task_ctx = Arc::new(
1825 TaskContext::default()
1826 .with_runtime(runtime)
1827 .with_session_config(session_config),
1828 );
1829
1830 let csv = test::scan_partitioned(partitions);
1831 let schema = csv.schema();
1832
1833 let sort_exec = Arc::new(
1834 SortExec::new(
1835 [PhysicalSortExpr {
1836 expr: col("i", &schema)?,
1837 options: SortOptions::default(),
1838 }]
1839 .into(),
1840 Arc::new(CoalescePartitionsExec::new(csv)),
1841 )
1842 .with_fetch(fetch),
1843 );
1844
1845 let result =
1846 collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1847 assert_eq!(result.len(), 1);
1848
1849 let metrics = sort_exec.metrics().unwrap();
1850 let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1851 assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1852 }
1853 Ok(())
1854 }
1855
1856 #[tokio::test]
1857 async fn test_sort_memory_reduction_per_batch() -> Result<()> {
1858 let batch_size = 50; let num_rows = 1000; let task_ctx = Arc::new(
1867 TaskContext::default().with_session_config(
1868 SessionConfig::new()
1869 .with_batch_size(batch_size)
1870 .with_sort_in_place_threshold_bytes(usize::MAX), ),
1872 );
1873
1874 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1875
1876 let mut values: Vec<i32> = (0..num_rows).collect();
1878 values.reverse();
1879
1880 let input_batch = RecordBatch::try_new(
1881 Arc::clone(&schema),
1882 vec![Arc::new(Int32Array::from(values))],
1883 )?;
1884
1885 let batches = vec![input_batch];
1886
1887 let sort_exec = Arc::new(SortExec::new(
1888 [PhysicalSortExpr {
1889 expr: Arc::new(Column::new("a", 0)),
1890 options: SortOptions::default(),
1891 }]
1892 .into(),
1893 TestMemoryExec::try_new_exec(
1894 std::slice::from_ref(&batches),
1895 Arc::clone(&schema),
1896 None,
1897 )?,
1898 ));
1899
1900 let mut stream = sort_exec.execute(0, Arc::clone(&task_ctx))?;
1901
1902 let mut previous_reserved = task_ctx.runtime_env().memory_pool.reserved();
1903 let mut batch_count = 0;
1904
1905 while let Some(result) = stream.next().await {
1907 let batch = result?;
1908 batch_count += 1;
1909
1910 assert!(batch.num_rows() > 0, "Batch should not be empty");
1912
1913 let current_reserved = task_ctx.runtime_env().memory_pool.reserved();
1914
1915 if batch_count > 1 {
1918 assert!(
1919 current_reserved <= previous_reserved,
1920 "Memory reservation should decrease or stay same as batches are emitted. \
1921 Batch {batch_count}: previous={previous_reserved}, current={current_reserved}"
1922 );
1923 }
1924
1925 previous_reserved = current_reserved;
1926 }
1927
1928 assert!(
1929 batch_count > 1,
1930 "Expected multiple batches to be emitted, got {batch_count}"
1931 );
1932
1933 assert_eq!(
1935 task_ctx.runtime_env().memory_pool.reserved(),
1936 0,
1937 "All memory should be returned after consuming all batches"
1938 );
1939
1940 Ok(())
1941 }
1942
1943 #[tokio::test]
1944 async fn test_sort_metadata() -> Result<()> {
1945 let task_ctx = Arc::new(TaskContext::default());
1946 let field_metadata: HashMap<String, String> =
1947 vec![("foo".to_string(), "bar".to_string())]
1948 .into_iter()
1949 .collect();
1950 let schema_metadata: HashMap<String, String> =
1951 vec![("baz".to_string(), "barf".to_string())]
1952 .into_iter()
1953 .collect();
1954
1955 let mut field = Field::new("field_name", DataType::UInt64, true);
1956 field.set_metadata(field_metadata.clone());
1957 let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1958 let schema = Arc::new(schema);
1959
1960 let data: ArrayRef =
1961 Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1962
1963 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1964 let input =
1965 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1966
1967 let sort_exec = Arc::new(SortExec::new(
1968 [PhysicalSortExpr {
1969 expr: col("field_name", &schema)?,
1970 options: SortOptions::default(),
1971 }]
1972 .into(),
1973 input,
1974 ));
1975
1976 let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1977
1978 let expected_data: ArrayRef =
1979 Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1980 let expected_batch =
1981 RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
1982
1983 assert_eq!(&vec![expected_batch], &result);
1985
1986 assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1988 assert_eq!(result[0].schema().metadata(), &schema_metadata);
1989
1990 Ok(())
1991 }
1992
1993 #[tokio::test]
1994 async fn test_lex_sort_by_mixed_types() -> Result<()> {
1995 let task_ctx = Arc::new(TaskContext::default());
1996 let schema = Arc::new(Schema::new(vec![
1997 Field::new("a", DataType::Int32, true),
1998 Field::new(
1999 "b",
2000 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2001 true,
2002 ),
2003 ]));
2004
2005 let batch = RecordBatch::try_new(
2007 Arc::clone(&schema),
2008 vec![
2009 Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
2010 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2011 Some(vec![Some(3)]),
2012 Some(vec![Some(1)]),
2013 Some(vec![Some(6), None]),
2014 Some(vec![Some(5)]),
2015 ])),
2016 ],
2017 )?;
2018
2019 let sort_exec = Arc::new(SortExec::new(
2020 [
2021 PhysicalSortExpr {
2022 expr: col("a", &schema)?,
2023 options: SortOptions {
2024 descending: false,
2025 nulls_first: true,
2026 },
2027 },
2028 PhysicalSortExpr {
2029 expr: col("b", &schema)?,
2030 options: SortOptions {
2031 descending: true,
2032 nulls_first: false,
2033 },
2034 },
2035 ]
2036 .into(),
2037 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
2038 ));
2039
2040 assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
2041 assert_eq!(
2042 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2043 *sort_exec.schema().field(1).data_type()
2044 );
2045
2046 let result: Vec<RecordBatch> =
2047 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2048 let metrics = sort_exec.metrics().unwrap();
2049 assert!(metrics.elapsed_compute().unwrap() > 0);
2050 assert_eq!(metrics.output_rows().unwrap(), 4);
2051 assert_eq!(result.len(), 1);
2052
2053 let expected = RecordBatch::try_new(
2054 schema,
2055 vec![
2056 Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
2057 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2058 Some(vec![Some(1)]),
2059 Some(vec![Some(6), None]),
2060 Some(vec![Some(5)]),
2061 Some(vec![Some(3)]),
2062 ])),
2063 ],
2064 )?;
2065
2066 assert_eq!(expected, result[0]);
2067
2068 Ok(())
2069 }
2070
2071 #[tokio::test]
2072 async fn test_lex_sort_by_float() -> Result<()> {
2073 let task_ctx = Arc::new(TaskContext::default());
2074 let schema = Arc::new(Schema::new(vec![
2075 Field::new("a", DataType::Float32, true),
2076 Field::new("b", DataType::Float64, true),
2077 ]));
2078
2079 let batch = RecordBatch::try_new(
2081 Arc::clone(&schema),
2082 vec![
2083 Arc::new(Float32Array::from(vec![
2084 Some(f32::NAN),
2085 None,
2086 None,
2087 Some(f32::NAN),
2088 Some(1.0_f32),
2089 Some(1.0_f32),
2090 Some(2.0_f32),
2091 Some(3.0_f32),
2092 ])),
2093 Arc::new(Float64Array::from(vec![
2094 Some(200.0_f64),
2095 Some(20.0_f64),
2096 Some(10.0_f64),
2097 Some(100.0_f64),
2098 Some(f64::NAN),
2099 None,
2100 None,
2101 Some(f64::NAN),
2102 ])),
2103 ],
2104 )?;
2105
2106 let sort_exec = Arc::new(SortExec::new(
2107 [
2108 PhysicalSortExpr {
2109 expr: col("a", &schema)?,
2110 options: SortOptions {
2111 descending: true,
2112 nulls_first: true,
2113 },
2114 },
2115 PhysicalSortExpr {
2116 expr: col("b", &schema)?,
2117 options: SortOptions {
2118 descending: false,
2119 nulls_first: false,
2120 },
2121 },
2122 ]
2123 .into(),
2124 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
2125 ));
2126
2127 assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
2128 assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
2129
2130 let result: Vec<RecordBatch> =
2131 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2132 let metrics = sort_exec.metrics().unwrap();
2133 assert!(metrics.elapsed_compute().unwrap() > 0);
2134 assert_eq!(metrics.output_rows().unwrap(), 8);
2135 assert_eq!(result.len(), 1);
2136
2137 let columns = result[0].columns();
2138
2139 assert_eq!(DataType::Float32, *columns[0].data_type());
2140 assert_eq!(DataType::Float64, *columns[1].data_type());
2141
2142 let a = as_primitive_array::<Float32Type>(&columns[0])?;
2143 let b = as_primitive_array::<Float64Type>(&columns[1])?;
2144
2145 let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2147 .map(|i| {
2148 let aval = if a.is_valid(i) {
2149 Some(a.value(i).to_string())
2150 } else {
2151 None
2152 };
2153 let bval = if b.is_valid(i) {
2154 Some(b.value(i).to_string())
2155 } else {
2156 None
2157 };
2158 (aval, bval)
2159 })
2160 .collect();
2161
2162 let expected: Vec<(Option<String>, Option<String>)> = vec![
2163 (None, Some("10".to_owned())),
2164 (None, Some("20".to_owned())),
2165 (Some("NaN".to_owned()), Some("100".to_owned())),
2166 (Some("NaN".to_owned()), Some("200".to_owned())),
2167 (Some("3".to_owned()), Some("NaN".to_owned())),
2168 (Some("2".to_owned()), None),
2169 (Some("1".to_owned()), Some("NaN".to_owned())),
2170 (Some("1".to_owned()), None),
2171 ];
2172
2173 assert_eq!(expected, result);
2174
2175 Ok(())
2176 }
2177
2178 #[tokio::test]
2179 async fn test_drop_cancel() -> Result<()> {
2180 let task_ctx = Arc::new(TaskContext::default());
2181 let schema =
2182 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2183
2184 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2185 let refs = blocking_exec.refs();
2186 let sort_exec = Arc::new(SortExec::new(
2187 [PhysicalSortExpr {
2188 expr: col("a", &schema)?,
2189 options: SortOptions::default(),
2190 }]
2191 .into(),
2192 blocking_exec,
2193 ));
2194
2195 let fut = collect(sort_exec, Arc::clone(&task_ctx));
2196 let mut fut = fut.boxed();
2197
2198 assert_is_pending(&mut fut);
2199 drop(fut);
2200 assert_strong_count_converges_to_zero(refs).await;
2201
2202 assert_eq!(
2203 task_ctx.runtime_env().memory_pool.reserved(),
2204 0,
2205 "The sort should have returned all memory used back to the memory manager"
2206 );
2207
2208 Ok(())
2209 }
2210
2211 #[test]
2212 fn test_empty_sort_batch() {
2213 let schema = Arc::new(Schema::empty());
2214 let options = RecordBatchOptions::new().with_row_count(Some(1));
2215 let batch =
2216 RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2217 .unwrap();
2218
2219 let expressions = [PhysicalSortExpr {
2220 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2221 options: SortOptions::default(),
2222 }]
2223 .into();
2224
2225 let result = sort_batch(&batch, &expressions, None).unwrap();
2226 assert_eq!(result.num_rows(), 1);
2227 }
2228
2229 #[tokio::test]
2230 async fn topk_unbounded_source() -> Result<()> {
2231 let task_ctx = Arc::new(TaskContext::default());
2232 let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2233 let source = SortedUnboundedExec {
2234 schema: schema.clone(),
2235 batch_size: 2,
2236 cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new(
2237 schema.clone(),
2238 ))),
2239 };
2240 let mut plan = SortExec::new(
2241 [PhysicalSortExpr::new_default(Arc::new(Column::new(
2242 "c1", 0,
2243 )))]
2244 .into(),
2245 Arc::new(source),
2246 );
2247 plan = plan.with_fetch(Some(9));
2248
2249 let batches = collect(Arc::new(plan), task_ctx).await?;
2250 assert_snapshot!(batches_to_string(&batches), @r"
2251 +----+
2252 | c1 |
2253 +----+
2254 | 0 |
2255 | 1 |
2256 | 2 |
2257 | 3 |
2258 | 4 |
2259 | 5 |
2260 | 6 |
2261 | 7 |
2262 | 8 |
2263 +----+
2264 ");
2265 Ok(())
2266 }
2267
2268 #[tokio::test]
2269 async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2270 let batch_size = 100;
2271
2272 let create_task_ctx = |_: &[RecordBatch]| {
2273 TaskContext::default().with_session_config(
2274 SessionConfig::new()
2275 .with_batch_size(batch_size)
2276 .with_sort_in_place_threshold_bytes(usize::MAX),
2277 )
2278 };
2279
2280 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2282
2283 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2285
2286 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2288
2289 Ok(())
2290 }
2291
2292 #[tokio::test]
2293 async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place()
2294 -> Result<()> {
2295 let batch_size = 100;
2296
2297 let create_task_ctx = |_: &[RecordBatch]| {
2298 TaskContext::default().with_session_config(
2299 SessionConfig::new()
2300 .with_batch_size(batch_size)
2301 .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2302 )
2303 };
2304
2305 {
2307 let metrics =
2308 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2309
2310 assert_eq!(
2311 metrics.spill_count(),
2312 Some(0),
2313 "Expected no spills when sorting in place"
2314 );
2315 }
2316
2317 {
2319 let metrics =
2320 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2321
2322 assert_eq!(
2323 metrics.spill_count(),
2324 Some(0),
2325 "Expected no spills when sorting in place"
2326 );
2327 }
2328
2329 {
2331 let metrics =
2332 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2333
2334 assert_eq!(
2335 metrics.spill_count(),
2336 Some(0),
2337 "Expected no spills when sorting in place"
2338 );
2339 }
2340
2341 Ok(())
2342 }
2343
2344 #[tokio::test]
2345 async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch()
2346 -> Result<()> {
2347 let batch_size = 100;
2348
2349 let create_task_ctx = |_: &[RecordBatch]| {
2350 TaskContext::default()
2351 .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2352 };
2353
2354 {
2356 let metrics = test_sort_output_batch_size(
2357 1,
2359 batch_size / 4,
2360 create_task_ctx,
2361 )
2362 .await?;
2363
2364 assert_eq!(
2365 metrics.spill_count(),
2366 Some(0),
2367 "Expected no spills when sorting in place"
2368 );
2369 }
2370
2371 {
2373 let metrics = test_sort_output_batch_size(
2374 1,
2376 batch_size + 7,
2377 create_task_ctx,
2378 )
2379 .await?;
2380
2381 assert_eq!(
2382 metrics.spill_count(),
2383 Some(0),
2384 "Expected no spills when sorting in place"
2385 );
2386 }
2387
2388 {
2390 let metrics = test_sort_output_batch_size(
2391 1,
2393 batch_size * 3,
2394 create_task_ctx,
2395 )
2396 .await?;
2397
2398 assert_eq!(
2399 metrics.spill_count(),
2400 Some(0),
2401 "Expected no spills when sorting in place"
2402 );
2403 }
2404
2405 Ok(())
2406 }
2407
2408 #[tokio::test]
2409 async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill()
2410 -> Result<()> {
2411 let batch_size = 100;
2412
2413 let create_task_ctx = |generated_batches: &[RecordBatch]| {
2414 let batches_memory = generated_batches
2415 .iter()
2416 .map(|b| b.get_array_memory_size())
2417 .sum::<usize>();
2418
2419 TaskContext::default()
2420 .with_session_config(
2421 SessionConfig::new()
2422 .with_batch_size(batch_size)
2423 .with_sort_in_place_threshold_bytes(1)
2425 .with_sort_spill_reservation_bytes(1),
2426 )
2427 .with_runtime(
2428 RuntimeEnvBuilder::default()
2429 .with_memory_limit(batches_memory, 1.0)
2430 .build_arc()
2431 .unwrap(),
2432 )
2433 };
2434
2435 {
2437 let metrics =
2438 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2439
2440 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2441 }
2442
2443 {
2445 let metrics =
2446 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2447
2448 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2449 }
2450
2451 {
2453 let metrics =
2454 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2455
2456 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2457 }
2458
2459 Ok(())
2460 }
2461
2462 async fn test_sort_output_batch_size(
2463 number_of_batches: usize,
2464 batch_size_to_generate: usize,
2465 create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2466 ) -> Result<MetricsSet> {
2467 let batches = (0..number_of_batches)
2468 .map(|_| make_partition(batch_size_to_generate as i32))
2469 .collect::<Vec<_>>();
2470 let task_ctx = create_task_ctx(batches.as_slice());
2471
2472 let expected_batch_size = task_ctx.session_config().batch_size();
2473
2474 let (mut output_batches, metrics) =
2475 run_sort_on_input(task_ctx, "i", batches).await?;
2476
2477 let last_batch = output_batches.pop().unwrap();
2478
2479 for batch in output_batches {
2480 assert_eq!(batch.num_rows(), expected_batch_size);
2481 }
2482
2483 let mut last_expected_batch_size =
2484 (batch_size_to_generate * number_of_batches) % expected_batch_size;
2485 if last_expected_batch_size == 0 {
2486 last_expected_batch_size = expected_batch_size;
2487 }
2488 assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2489
2490 Ok(metrics)
2491 }
2492
2493 async fn run_sort_on_input(
2494 task_ctx: TaskContext,
2495 order_by_col: &str,
2496 batches: Vec<RecordBatch>,
2497 ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2498 let task_ctx = Arc::new(task_ctx);
2499
2500 let schema = batches[0].schema();
2502 let ordering: LexOrdering = [PhysicalSortExpr {
2503 expr: col(order_by_col, &schema)?,
2504 options: SortOptions {
2505 descending: false,
2506 nulls_first: true,
2507 },
2508 }]
2509 .into();
2510 let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2511 ordering.clone(),
2512 TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2513 ));
2514
2515 let sorted_batches =
2516 collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2517
2518 let metrics = sort_exec.metrics().expect("sort have metrics");
2519
2520 {
2522 let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2523 let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2524
2525 let sorted_batches_concat =
2526 concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2527
2528 assert_eq!(sorted_input_batch, sorted_batches_concat);
2529 }
2530
2531 Ok((sorted_batches, metrics))
2532 }
2533
2534 #[tokio::test]
2535 async fn test_sort_batch_chunked_basic() -> Result<()> {
2536 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2537
2538 let mut values: Vec<i32> = (0..1000).collect();
2540 values.reverse();
2542
2543 let batch = RecordBatch::try_new(
2544 Arc::clone(&schema),
2545 vec![Arc::new(Int32Array::from(values))],
2546 )?;
2547
2548 let expressions: LexOrdering =
2549 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2550
2551 let result_batches = sort_batch_chunked(&batch, &expressions, 250)?;
2553
2554 assert_eq!(result_batches.len(), 4);
2556
2557 let mut total_rows = 0;
2559 for (i, batch) in result_batches.iter().enumerate() {
2560 assert!(
2561 batch.num_rows() <= 250,
2562 "Batch {} has {} rows, expected <= 250",
2563 i,
2564 batch.num_rows()
2565 );
2566 total_rows += batch.num_rows();
2567 }
2568
2569 assert_eq!(total_rows, 1000);
2571
2572 let concatenated = concat_batches(&schema, &result_batches)?;
2574 let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2575 for i in 0..array.len() - 1 {
2576 assert!(
2577 array.value(i) <= array.value(i + 1),
2578 "Array not sorted at position {}: {} > {}",
2579 i,
2580 array.value(i),
2581 array.value(i + 1)
2582 );
2583 }
2584 assert_eq!(array.value(0), 0);
2585 assert_eq!(array.value(array.len() - 1), 999);
2586
2587 Ok(())
2588 }
2589
2590 #[tokio::test]
2591 async fn test_sort_batch_chunked_smaller_than_batch_size() -> Result<()> {
2592 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2593
2594 let values: Vec<i32> = (0..50).rev().collect();
2596 let batch = RecordBatch::try_new(
2597 Arc::clone(&schema),
2598 vec![Arc::new(Int32Array::from(values))],
2599 )?;
2600
2601 let expressions: LexOrdering =
2602 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2603
2604 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2606
2607 assert_eq!(result_batches.len(), 1);
2609 assert_eq!(result_batches[0].num_rows(), 50);
2610
2611 let array = as_primitive_array::<Int32Type>(result_batches[0].column(0))?;
2613 for i in 0..array.len() - 1 {
2614 assert!(array.value(i) <= array.value(i + 1));
2615 }
2616 assert_eq!(array.value(0), 0);
2617 assert_eq!(array.value(49), 49);
2618
2619 Ok(())
2620 }
2621
2622 #[tokio::test]
2623 async fn test_sort_batch_chunked_exact_multiple() -> Result<()> {
2624 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2625
2626 let values: Vec<i32> = (0..1000).rev().collect();
2628 let batch = RecordBatch::try_new(
2629 Arc::clone(&schema),
2630 vec![Arc::new(Int32Array::from(values))],
2631 )?;
2632
2633 let expressions: LexOrdering =
2634 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2635
2636 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2638
2639 assert_eq!(result_batches.len(), 10);
2641 for batch in &result_batches {
2642 assert_eq!(batch.num_rows(), 100);
2643 }
2644
2645 let concatenated = concat_batches(&schema, &result_batches)?;
2647 let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2648 for i in 0..array.len() - 1 {
2649 assert!(array.value(i) <= array.value(i + 1));
2650 }
2651
2652 Ok(())
2653 }
2654
2655 #[tokio::test]
2656 async fn test_sort_batch_chunked_empty_batch() -> Result<()> {
2657 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2658
2659 let batch = RecordBatch::new_empty(Arc::clone(&schema));
2660
2661 let expressions: LexOrdering =
2662 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2663
2664 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2665
2666 assert_eq!(result_batches.len(), 0);
2668
2669 Ok(())
2670 }
2671
2672 #[tokio::test]
2673 async fn test_get_reserved_bytes_for_record_batch_with_sliced_batches() -> Result<()>
2674 {
2675 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2676
2677 let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
2679 let sliced_array = large_array.slice(100, 50); let sliced_batch =
2682 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(sliced_array)])?;
2683 let batch =
2684 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(large_array)])?;
2685
2686 let sliced_reserved = get_reserved_bytes_for_record_batch(&sliced_batch)?;
2687 let reserved = get_reserved_bytes_for_record_batch(&batch)?;
2688
2689 assert!(reserved > sliced_reserved);
2691
2692 Ok(())
2693 }
2694}