1use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
31use crate::expressions::PhysicalSortExpr;
32use crate::filter_pushdown::{
33 ChildFilterDescription, FilterDescription, FilterPushdownPhase,
34};
35use crate::limit::LimitStream;
36use crate::metrics::{
37 BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
38};
39use crate::projection::{ProjectionExec, make_with_child, update_ordering};
40use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
41use crate::spill::get_record_batch_memory_size;
42use crate::spill::in_progress_spill_file::InProgressSpillFile;
43use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
44use crate::stream::RecordBatchStreamAdapter;
45use crate::stream::ReservationStream;
46use crate::topk::TopK;
47use crate::topk::TopKDynamicFilters;
48use crate::{
49 DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
50 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
51 Statistics,
52};
53
54use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
55use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
56use arrow::datatypes::SchemaRef;
57use datafusion_common::config::SpillCompression;
58use datafusion_common::{
59 DataFusionError, Result, assert_or_internal_err, internal_datafusion_err,
60 unwrap_or_internal_err,
61};
62use datafusion_execution::TaskContext;
63use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
64use datafusion_execution::runtime_env::RuntimeEnv;
65use datafusion_physical_expr::LexOrdering;
66use datafusion_physical_expr::PhysicalExpr;
67use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
68
69use futures::{StreamExt, TryStreamExt};
70use log::{debug, trace};
71
72struct ExternalSorterMetrics {
73 baseline: BaselineMetrics,
75
76 spill_metrics: SpillMetrics,
77}
78
79impl ExternalSorterMetrics {
80 fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
81 Self {
82 baseline: BaselineMetrics::new(metrics, partition),
83 spill_metrics: SpillMetrics::new(metrics, partition),
84 }
85 }
86}
87
88struct ExternalSorter {
206 schema: SchemaRef,
212 expr: LexOrdering,
214 batch_size: usize,
216 sort_in_place_threshold_bytes: usize,
220
221 in_mem_batches: Vec<RecordBatch>,
227
228 in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
235 finished_spill_files: Vec<SortedSpillFile>,
240
241 metrics: ExternalSorterMetrics,
247 runtime: Arc<RuntimeEnv>,
249 reservation: MemoryReservation,
251 spill_manager: SpillManager,
252
253 merge_reservation: MemoryReservation,
257 sort_spill_reservation_bytes: usize,
260}
261
262impl ExternalSorter {
263 #[expect(clippy::too_many_arguments)]
266 pub fn new(
267 partition_id: usize,
268 schema: SchemaRef,
269 expr: LexOrdering,
270 batch_size: usize,
271 sort_spill_reservation_bytes: usize,
272 sort_in_place_threshold_bytes: usize,
273 spill_compression: SpillCompression,
275 metrics: &ExecutionPlanMetricsSet,
276 runtime: Arc<RuntimeEnv>,
277 ) -> Result<Self> {
278 let metrics = ExternalSorterMetrics::new(metrics, partition_id);
279 let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
280 .with_can_spill(true)
281 .register(&runtime.memory_pool);
282
283 let merge_reservation =
284 MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
285 .register(&runtime.memory_pool);
286
287 let spill_manager = SpillManager::new(
288 Arc::clone(&runtime),
289 metrics.spill_metrics.clone(),
290 Arc::clone(&schema),
291 )
292 .with_compression_type(spill_compression);
293
294 Ok(Self {
295 schema,
296 in_mem_batches: vec![],
297 in_progress_spill_file: None,
298 finished_spill_files: vec![],
299 expr,
300 metrics,
301 reservation,
302 spill_manager,
303 merge_reservation,
304 runtime,
305 batch_size,
306 sort_spill_reservation_bytes,
307 sort_in_place_threshold_bytes,
308 })
309 }
310
311 async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
315 if input.num_rows() == 0 {
316 return Ok(());
317 }
318
319 self.reserve_memory_for_merge()?;
320 self.reserve_memory_for_batch_and_maybe_spill(&input)
321 .await?;
322
323 self.in_mem_batches.push(input);
324 Ok(())
325 }
326
327 fn spilled_before(&self) -> bool {
328 !self.finished_spill_files.is_empty()
329 }
330
331 async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
341 self.merge_reservation.free();
345
346 if self.spilled_before() {
347 if !self.in_mem_batches.is_empty() {
351 self.sort_and_spill_in_mem_batches().await?;
352 }
353
354 StreamingMergeBuilder::new()
355 .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
356 .with_spill_manager(self.spill_manager.clone())
357 .with_schema(Arc::clone(&self.schema))
358 .with_expressions(&self.expr.clone())
359 .with_metrics(self.metrics.baseline.clone())
360 .with_batch_size(self.batch_size)
361 .with_fetch(None)
362 .with_reservation(self.merge_reservation.new_empty())
363 .build()
364 } else {
365 self.in_mem_sort_stream(self.metrics.baseline.clone())
366 }
367 }
368
369 fn used(&self) -> usize {
371 self.reservation.size()
372 }
373
374 fn spilled_bytes(&self) -> usize {
376 self.metrics.spill_metrics.spilled_bytes.value()
377 }
378
379 fn spilled_rows(&self) -> usize {
381 self.metrics.spill_metrics.spilled_rows.value()
382 }
383
384 fn spill_count(&self) -> usize {
386 self.metrics.spill_metrics.spill_file_count.value()
387 }
388
389 async fn consume_and_spill_append(
392 &mut self,
393 globally_sorted_batches: &mut Vec<RecordBatch>,
394 ) -> Result<()> {
395 if globally_sorted_batches.is_empty() {
396 return Ok(());
397 }
398
399 if self.in_progress_spill_file.is_none() {
401 self.in_progress_spill_file =
402 Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
403 }
404
405 Self::organize_stringview_arrays(globally_sorted_batches)?;
406
407 debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
408
409 let batches_to_spill = std::mem::take(globally_sorted_batches);
410 self.reservation.free();
411
412 let (in_progress_file, max_record_batch_size) =
413 self.in_progress_spill_file.as_mut().ok_or_else(|| {
414 internal_datafusion_err!("In-progress spill file should be initialized")
415 })?;
416
417 for batch in batches_to_spill {
418 in_progress_file.append_batch(&batch)?;
419
420 *max_record_batch_size =
421 (*max_record_batch_size).max(batch.get_sliced_size()?);
422 }
423
424 assert_or_internal_err!(
425 globally_sorted_batches.is_empty(),
426 "This function consumes globally_sorted_batches, so it should be empty after taking."
427 );
428
429 Ok(())
430 }
431
432 async fn spill_finish(&mut self) -> Result<()> {
434 let (mut in_progress_file, max_record_batch_memory) =
435 self.in_progress_spill_file.take().ok_or_else(|| {
436 internal_datafusion_err!("Should be called after `spill_append`")
437 })?;
438 let spill_file = in_progress_file.finish()?;
439
440 if let Some(spill_file) = spill_file {
441 self.finished_spill_files.push(SortedSpillFile {
442 file: spill_file,
443 max_record_batch_memory,
444 });
445 }
446
447 Ok(())
448 }
449
450 fn organize_stringview_arrays(
480 globally_sorted_batches: &mut Vec<RecordBatch>,
481 ) -> Result<()> {
482 let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
483
484 for batch in globally_sorted_batches.drain(..) {
485 let mut new_columns: Vec<Arc<dyn Array>> =
486 Vec::with_capacity(batch.num_columns());
487
488 let mut arr_mutated = false;
489 for array in batch.columns() {
490 if let Some(string_view_array) =
491 array.as_any().downcast_ref::<StringViewArray>()
492 {
493 let new_array = string_view_array.gc();
494 new_columns.push(Arc::new(new_array));
495 arr_mutated = true;
496 } else {
497 new_columns.push(Arc::clone(array));
498 }
499 }
500
501 let organized_batch = if arr_mutated {
502 RecordBatch::try_new(batch.schema(), new_columns)?
503 } else {
504 batch
505 };
506
507 organized_batches.push(organized_batch);
508 }
509
510 *globally_sorted_batches = organized_batches;
511
512 Ok(())
513 }
514
515 async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
518 assert_or_internal_err!(
519 !self.in_mem_batches.is_empty(),
520 "in_mem_batches must not be empty when attempting to sort and spill"
521 );
522
523 self.merge_reservation.free();
528
529 let mut sorted_stream =
530 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
531 assert_or_internal_err!(
534 self.in_mem_batches.is_empty(),
535 "in_mem_batches should be empty after constructing sorted stream"
536 );
537 let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
541
542 while let Some(batch) = sorted_stream.next().await {
543 let batch = batch?;
544 let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
545 if self.reservation.try_grow(sorted_size).is_err() {
546 globally_sorted_batches.push(batch);
550 self.consume_and_spill_append(&mut globally_sorted_batches)
551 .await?; } else {
553 globally_sorted_batches.push(batch);
554 }
555 }
556
557 drop(sorted_stream);
560
561 self.consume_and_spill_append(&mut globally_sorted_batches)
562 .await?;
563 self.spill_finish().await?;
564
565 let buffers_cleared_property =
567 self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
568 assert_or_internal_err!(
569 buffers_cleared_property,
570 "in_mem_batches and globally_sorted_batches should be cleared before"
571 );
572
573 self.reserve_memory_for_merge()?;
575
576 Ok(())
577 }
578
579 fn in_mem_sort_stream(
638 &mut self,
639 metrics: BaselineMetrics,
640 ) -> Result<SendableRecordBatchStream> {
641 if self.in_mem_batches.is_empty() {
642 return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
643 &self.schema,
644 ))));
645 }
646
647 let elapsed_compute = metrics.elapsed_compute().clone();
650 let _timer = elapsed_compute.timer();
651
652 if self.in_mem_batches.len() == 1 {
659 let batch = self.in_mem_batches.swap_remove(0);
660 let reservation = self.reservation.take();
661 return self.sort_batch_stream(batch, &metrics, reservation);
662 }
663
664 if self.reservation.size() < self.sort_in_place_threshold_bytes {
666 let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
668 self.in_mem_batches.clear();
669 self.reservation
670 .try_resize(get_reserved_bytes_for_record_batch(&batch)?)
671 .map_err(Self::err_with_oom_context)?;
672 let reservation = self.reservation.take();
673 return self.sort_batch_stream(batch, &metrics, reservation);
674 }
675
676 let streams = std::mem::take(&mut self.in_mem_batches)
677 .into_iter()
678 .map(|batch| {
679 let metrics = self.metrics.baseline.intermediate();
680 let reservation = self
681 .reservation
682 .split(get_reserved_bytes_for_record_batch(&batch)?);
683 let input = self.sort_batch_stream(batch, &metrics, reservation)?;
684 Ok(spawn_buffered(input, 1))
685 })
686 .collect::<Result<_>>()?;
687
688 StreamingMergeBuilder::new()
689 .with_streams(streams)
690 .with_schema(Arc::clone(&self.schema))
691 .with_expressions(&self.expr.clone())
692 .with_metrics(metrics)
693 .with_batch_size(self.batch_size)
694 .with_fetch(None)
695 .with_reservation(self.merge_reservation.new_empty())
696 .build()
697 }
698
699 fn sort_batch_stream(
709 &self,
710 batch: RecordBatch,
711 metrics: &BaselineMetrics,
712 mut reservation: MemoryReservation,
713 ) -> Result<SendableRecordBatchStream> {
714 assert_eq!(
715 get_reserved_bytes_for_record_batch(&batch)?,
716 reservation.size()
717 );
718
719 let schema = batch.schema();
720 let expressions = self.expr.clone();
721 let batch_size = self.batch_size;
722 let output_row_metrics = metrics.output_rows().clone();
723
724 let stream = futures::stream::once(async move {
725 let schema = batch.schema();
726
727 let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
729 drop(batch);
730
731 let total_sorted_size: usize = sorted_batches
736 .iter()
737 .map(get_record_batch_memory_size)
738 .sum();
739 reservation
740 .try_resize(total_sorted_size)
741 .map_err(Self::err_with_oom_context)?;
742
743 Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
745 Arc::clone(&schema),
746 Box::pin(RecordBatchStreamAdapter::new(
747 schema,
748 futures::stream::iter(sorted_batches.into_iter().map(Ok)),
749 )),
750 reservation,
751 )) as SendableRecordBatchStream)
752 })
753 .try_flatten()
754 .map(move |batch| match batch {
755 Ok(batch) => {
756 output_row_metrics.add(batch.num_rows());
757 Ok(batch)
758 }
759 Err(e) => Err(e),
760 });
761
762 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
763 }
764
765 fn reserve_memory_for_merge(&mut self) -> Result<()> {
769 if self.runtime.disk_manager.tmp_files_enabled() {
771 let size = self.sort_spill_reservation_bytes;
772 if self.merge_reservation.size() != size {
773 self.merge_reservation
774 .try_resize(size)
775 .map_err(Self::err_with_oom_context)?;
776 }
777 }
778
779 Ok(())
780 }
781
782 async fn reserve_memory_for_batch_and_maybe_spill(
785 &mut self,
786 input: &RecordBatch,
787 ) -> Result<()> {
788 let size = get_reserved_bytes_for_record_batch(input)?;
789
790 match self.reservation.try_grow(size) {
791 Ok(_) => Ok(()),
792 Err(e) => {
793 if self.in_mem_batches.is_empty() {
794 return Err(Self::err_with_oom_context(e));
795 }
796
797 self.sort_and_spill_in_mem_batches().await?;
799 self.reservation
800 .try_grow(size)
801 .map_err(Self::err_with_oom_context)
802 }
803 }
804 }
805
806 fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
809 match e {
810 DataFusionError::ResourcesExhausted(_) => e.context(
811 "Not enough memory to continue external sort. \
812 Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
813 ),
814 _ => e,
816 }
817 }
818}
819
820pub(crate) fn get_reserved_bytes_for_record_batch_size(
831 record_batch_size: usize,
832 sliced_size: usize,
833) -> usize {
834 record_batch_size + sliced_size
838}
839
840pub(super) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result<usize> {
844 Ok(get_reserved_bytes_for_record_batch_size(
845 get_record_batch_memory_size(batch),
846 batch.get_sliced_size()?,
847 ))
848}
849
850impl Debug for ExternalSorter {
851 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
852 f.debug_struct("ExternalSorter")
853 .field("memory_used", &self.used())
854 .field("spilled_bytes", &self.spilled_bytes())
855 .field("spilled_rows", &self.spilled_rows())
856 .field("spill_count", &self.spill_count())
857 .finish()
858 }
859}
860
861pub fn sort_batch(
862 batch: &RecordBatch,
863 expressions: &LexOrdering,
864 fetch: Option<usize>,
865) -> Result<RecordBatch> {
866 let sort_columns = expressions
867 .iter()
868 .map(|expr| expr.evaluate_to_sort_column(batch))
869 .collect::<Result<Vec<_>>>()?;
870
871 let indices = lexsort_to_indices(&sort_columns, fetch)?;
872 let columns = take_arrays(batch.columns(), &indices, None)?;
873
874 let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
875 Ok(RecordBatch::try_new_with_options(
876 batch.schema(),
877 columns,
878 &options,
879 )?)
880}
881
882pub fn sort_batch_chunked(
886 batch: &RecordBatch,
887 expressions: &LexOrdering,
888 batch_size: usize,
889) -> Result<Vec<RecordBatch>> {
890 let sort_columns = expressions
891 .iter()
892 .map(|expr| expr.evaluate_to_sort_column(batch))
893 .collect::<Result<Vec<_>>>()?;
894
895 let indices = lexsort_to_indices(&sort_columns, None)?;
896
897 let num_rows = indices.len();
899 let num_chunks = num_rows.div_ceil(batch_size);
900
901 let result_batches = (0..num_chunks)
902 .map(|chunk_idx| {
903 let start = chunk_idx * batch_size;
904 let end = (start + batch_size).min(num_rows);
905 let chunk_len = end - start;
906
907 let chunk_indices = indices.slice(start, chunk_len);
909
910 let columns = take_arrays(batch.columns(), &chunk_indices, None)?;
912
913 let options = RecordBatchOptions::new().with_row_count(Some(chunk_len));
914 let chunk_batch =
915 RecordBatch::try_new_with_options(batch.schema(), columns, &options)?;
916
917 Ok(chunk_batch)
918 })
919 .collect::<Result<Vec<RecordBatch>>>()?;
920
921 Ok(result_batches)
922}
923
924#[derive(Debug, Clone)]
929pub struct SortExec {
930 pub(crate) input: Arc<dyn ExecutionPlan>,
932 expr: LexOrdering,
934 metrics_set: ExecutionPlanMetricsSet,
936 preserve_partitioning: bool,
939 fetch: Option<usize>,
941 common_sort_prefix: Vec<PhysicalSortExpr>,
943 cache: PlanProperties,
945 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
949}
950
951impl SortExec {
952 pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
955 let preserve_partitioning = false;
956 let (cache, sort_prefix) =
957 Self::compute_properties(&input, expr.clone(), preserve_partitioning)
958 .unwrap();
959 Self {
960 expr,
961 input,
962 metrics_set: ExecutionPlanMetricsSet::new(),
963 preserve_partitioning,
964 fetch: None,
965 common_sort_prefix: sort_prefix,
966 cache,
967 filter: None,
968 }
969 }
970
971 pub fn preserve_partitioning(&self) -> bool {
973 self.preserve_partitioning
974 }
975
976 pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
984 self.preserve_partitioning = preserve_partitioning;
985 self.cache = self
986 .cache
987 .with_partitioning(Self::output_partitioning_helper(
988 &self.input,
989 self.preserve_partitioning,
990 ));
991 self
992 }
993
994 fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
996 let children = self
997 .expr
998 .iter()
999 .map(|sort_expr| Arc::clone(&sort_expr.expr))
1000 .collect::<Vec<_>>();
1001 Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1002 DynamicFilterPhysicalExpr::new(children, lit(true)),
1003 ))))
1004 }
1005
1006 fn cloned(&self) -> Self {
1007 SortExec {
1008 input: Arc::clone(&self.input),
1009 expr: self.expr.clone(),
1010 metrics_set: self.metrics_set.clone(),
1011 preserve_partitioning: self.preserve_partitioning,
1012 common_sort_prefix: self.common_sort_prefix.clone(),
1013 fetch: self.fetch,
1014 cache: self.cache.clone(),
1015 filter: self.filter.clone(),
1016 }
1017 }
1018
1019 pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
1027 let mut cache = self.cache.clone();
1028 let is_pipeline_friendly = matches!(
1032 self.cache.emission_type,
1033 EmissionType::Incremental | EmissionType::Both
1034 );
1035 if fetch.is_some() && is_pipeline_friendly {
1036 cache = cache.with_boundedness(Boundedness::Bounded);
1037 }
1038 let filter = fetch.is_some().then(|| {
1039 self.filter.clone().unwrap_or_else(|| self.create_filter())
1041 });
1042 let mut new_sort = self.cloned();
1043 new_sort.fetch = fetch;
1044 new_sort.cache = cache;
1045 new_sort.filter = filter;
1046 new_sort
1047 }
1048
1049 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1051 &self.input
1052 }
1053
1054 pub fn expr(&self) -> &LexOrdering {
1056 &self.expr
1057 }
1058
1059 pub fn fetch(&self) -> Option<usize> {
1061 self.fetch
1062 }
1063
1064 fn output_partitioning_helper(
1065 input: &Arc<dyn ExecutionPlan>,
1066 preserve_partitioning: bool,
1067 ) -> Partitioning {
1068 if preserve_partitioning {
1070 input.output_partitioning().clone()
1071 } else {
1072 Partitioning::UnknownPartitioning(1)
1073 }
1074 }
1075
1076 fn compute_properties(
1079 input: &Arc<dyn ExecutionPlan>,
1080 sort_exprs: LexOrdering,
1081 preserve_partitioning: bool,
1082 ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1083 let (sort_prefix, sort_satisfied) = input
1084 .equivalence_properties()
1085 .extract_common_sort_prefix(sort_exprs.clone())?;
1086
1087 let emission_type = if sort_satisfied {
1091 input.pipeline_behavior()
1092 } else {
1093 EmissionType::Final
1094 };
1095
1096 let boundedness = if sort_satisfied {
1102 input.boundedness()
1103 } else {
1104 match input.boundedness() {
1105 Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1106 requires_infinite_memory: true,
1107 },
1108 bounded => bounded,
1109 }
1110 };
1111
1112 let mut eq_properties = input.equivalence_properties().clone();
1115 eq_properties.reorder(sort_exprs)?;
1116
1117 let output_partitioning =
1119 Self::output_partitioning_helper(input, preserve_partitioning);
1120
1121 Ok((
1122 PlanProperties::new(
1123 eq_properties,
1124 output_partitioning,
1125 emission_type,
1126 boundedness,
1127 ),
1128 sort_prefix,
1129 ))
1130 }
1131}
1132
1133impl DisplayAs for SortExec {
1134 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1135 match t {
1136 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1137 let preserve_partitioning = self.preserve_partitioning;
1138 match self.fetch {
1139 Some(fetch) => {
1140 write!(
1141 f,
1142 "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1143 self.expr
1144 )?;
1145 if let Some(filter) = &self.filter
1146 && let Ok(current) = filter.read().expr().current()
1147 && !current.eq(&lit(true))
1148 {
1149 write!(f, ", filter=[{current}]")?;
1150 }
1151 if !self.common_sort_prefix.is_empty() {
1152 write!(f, ", sort_prefix=[")?;
1153 let mut first = true;
1154 for sort_expr in &self.common_sort_prefix {
1155 if first {
1156 first = false;
1157 } else {
1158 write!(f, ", ")?;
1159 }
1160 write!(f, "{sort_expr}")?;
1161 }
1162 write!(f, "]")
1163 } else {
1164 Ok(())
1165 }
1166 }
1167 None => write!(
1168 f,
1169 "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1170 self.expr
1171 ),
1172 }
1173 }
1174 DisplayFormatType::TreeRender => match self.fetch {
1175 Some(fetch) => {
1176 writeln!(f, "{}", self.expr)?;
1177 writeln!(f, "limit={fetch}")
1178 }
1179 None => {
1180 writeln!(f, "{}", self.expr)
1181 }
1182 },
1183 }
1184 }
1185}
1186
1187impl ExecutionPlan for SortExec {
1188 fn name(&self) -> &'static str {
1189 match self.fetch {
1190 Some(_) => "SortExec(TopK)",
1191 None => "SortExec",
1192 }
1193 }
1194
1195 fn as_any(&self) -> &dyn Any {
1196 self
1197 }
1198
1199 fn properties(&self) -> &PlanProperties {
1200 &self.cache
1201 }
1202
1203 fn required_input_distribution(&self) -> Vec<Distribution> {
1204 if self.preserve_partitioning {
1205 vec![Distribution::UnspecifiedDistribution]
1206 } else {
1207 vec![Distribution::SinglePartition]
1210 }
1211 }
1212
1213 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1214 vec![&self.input]
1215 }
1216
1217 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1218 vec![false]
1219 }
1220
1221 fn with_new_children(
1222 self: Arc<Self>,
1223 children: Vec<Arc<dyn ExecutionPlan>>,
1224 ) -> Result<Arc<dyn ExecutionPlan>> {
1225 let mut new_sort = self.cloned();
1226 assert_eq!(children.len(), 1, "SortExec should have exactly one child");
1227 new_sort.input = Arc::clone(&children[0]);
1228 let (cache, sort_prefix) = Self::compute_properties(
1230 &new_sort.input,
1231 new_sort.expr.clone(),
1232 new_sort.preserve_partitioning,
1233 )?;
1234 new_sort.cache = cache;
1235 new_sort.common_sort_prefix = sort_prefix;
1236
1237 Ok(Arc::new(new_sort))
1238 }
1239
1240 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1241 let children = self.children().into_iter().cloned().collect();
1242 let new_sort = self.with_new_children(children)?;
1243 let mut new_sort = new_sort
1244 .as_any()
1245 .downcast_ref::<SortExec>()
1246 .expect("cloned 1 lines above this line, we know the type")
1247 .clone();
1248 new_sort.filter = Some(new_sort.create_filter());
1250 new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1251
1252 Ok(Arc::new(new_sort))
1253 }
1254
1255 fn execute(
1256 &self,
1257 partition: usize,
1258 context: Arc<TaskContext>,
1259 ) -> Result<SendableRecordBatchStream> {
1260 trace!(
1261 "Start SortExec::execute for partition {} of context session_id {} and task_id {:?}",
1262 partition,
1263 context.session_id(),
1264 context.task_id()
1265 );
1266
1267 let mut input = self.input.execute(partition, Arc::clone(&context))?;
1268
1269 let execution_options = &context.session_config().options().execution;
1270
1271 trace!("End SortExec's input.execute for partition: {partition}");
1272
1273 let sort_satisfied = self
1274 .input
1275 .equivalence_properties()
1276 .ordering_satisfy(self.expr.clone())?;
1277
1278 match (sort_satisfied, self.fetch.as_ref()) {
1279 (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1280 input,
1281 0,
1282 Some(*fetch),
1283 BaselineMetrics::new(&self.metrics_set, partition),
1284 ))),
1285 (true, None) => Ok(input),
1286 (false, Some(fetch)) => {
1287 let filter = self.filter.clone();
1288 let mut topk = TopK::try_new(
1289 partition,
1290 input.schema(),
1291 self.common_sort_prefix.clone(),
1292 self.expr.clone(),
1293 *fetch,
1294 context.session_config().batch_size(),
1295 context.runtime_env(),
1296 &self.metrics_set,
1297 Arc::clone(&unwrap_or_internal_err!(filter)),
1298 )?;
1299 Ok(Box::pin(RecordBatchStreamAdapter::new(
1300 self.schema(),
1301 futures::stream::once(async move {
1302 while let Some(batch) = input.next().await {
1303 let batch = batch?;
1304 topk.insert_batch(batch)?;
1305 if topk.finished {
1306 break;
1307 }
1308 }
1309 topk.emit()
1310 })
1311 .try_flatten(),
1312 )))
1313 }
1314 (false, None) => {
1315 let mut sorter = ExternalSorter::new(
1316 partition,
1317 input.schema(),
1318 self.expr.clone(),
1319 context.session_config().batch_size(),
1320 execution_options.sort_spill_reservation_bytes,
1321 execution_options.sort_in_place_threshold_bytes,
1322 context.session_config().spill_compression(),
1323 &self.metrics_set,
1324 context.runtime_env(),
1325 )?;
1326 Ok(Box::pin(RecordBatchStreamAdapter::new(
1327 self.schema(),
1328 futures::stream::once(async move {
1329 while let Some(batch) = input.next().await {
1330 let batch = batch?;
1331 sorter.insert_batch(batch).await?;
1332 }
1333 sorter.sort().await
1334 })
1335 .try_flatten(),
1336 )))
1337 }
1338 }
1339 }
1340
1341 fn metrics(&self) -> Option<MetricsSet> {
1342 Some(self.metrics_set.clone_inner())
1343 }
1344
1345 fn statistics(&self) -> Result<Statistics> {
1346 self.partition_statistics(None)
1347 }
1348
1349 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1350 if !self.preserve_partitioning() {
1351 return self
1352 .input
1353 .partition_statistics(None)?
1354 .with_fetch(self.fetch, 0, 1);
1355 }
1356 self.input
1357 .partition_statistics(partition)?
1358 .with_fetch(self.fetch, 0, 1)
1359 }
1360
1361 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1362 Some(Arc::new(SortExec::with_fetch(self, limit)))
1363 }
1364
1365 fn fetch(&self) -> Option<usize> {
1366 self.fetch
1367 }
1368
1369 fn cardinality_effect(&self) -> CardinalityEffect {
1370 if self.fetch.is_none() {
1371 CardinalityEffect::Equal
1372 } else {
1373 CardinalityEffect::LowerEqual
1374 }
1375 }
1376
1377 fn try_swapping_with_projection(
1381 &self,
1382 projection: &ProjectionExec,
1383 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1384 if projection.expr().len() >= projection.input().schema().fields().len() {
1386 return Ok(None);
1387 }
1388
1389 let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1390 else {
1391 return Ok(None);
1392 };
1393
1394 Ok(Some(Arc::new(
1395 SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1396 .with_fetch(self.fetch())
1397 .with_preserve_partitioning(self.preserve_partitioning()),
1398 )))
1399 }
1400
1401 fn gather_filters_for_pushdown(
1402 &self,
1403 phase: FilterPushdownPhase,
1404 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1405 config: &datafusion_common::config::ConfigOptions,
1406 ) -> Result<FilterDescription> {
1407 if !matches!(phase, FilterPushdownPhase::Post) {
1408 return FilterDescription::from_children(parent_filters, &self.children());
1409 }
1410
1411 let mut child =
1412 ChildFilterDescription::from_child(&parent_filters, self.input())?;
1413
1414 if let Some(filter) = &self.filter
1415 && config.optimizer.enable_topk_dynamic_filter_pushdown
1416 {
1417 child = child.with_self_filter(filter.read().expr());
1418 }
1419
1420 Ok(FilterDescription::new().with_child(child))
1421 }
1422}
1423
1424#[cfg(test)]
1425mod tests {
1426 use std::collections::HashMap;
1427 use std::pin::Pin;
1428 use std::task::{Context, Poll};
1429
1430 use super::*;
1431 use crate::coalesce_partitions::CoalescePartitionsExec;
1432 use crate::collect;
1433 use crate::execution_plan::Boundedness;
1434 use crate::expressions::col;
1435 use crate::test;
1436 use crate::test::TestMemoryExec;
1437 use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
1438 use crate::test::{assert_is_pending, make_partition};
1439
1440 use arrow::array::*;
1441 use arrow::compute::SortOptions;
1442 use arrow::datatypes::*;
1443 use datafusion_common::cast::as_primitive_array;
1444 use datafusion_common::test_util::batches_to_string;
1445 use datafusion_common::{DataFusionError, Result, ScalarValue};
1446 use datafusion_execution::RecordBatchStream;
1447 use datafusion_execution::config::SessionConfig;
1448 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1449 use datafusion_physical_expr::EquivalenceProperties;
1450 use datafusion_physical_expr::expressions::{Column, Literal};
1451
1452 use futures::{FutureExt, Stream};
1453 use insta::assert_snapshot;
1454
1455 #[derive(Debug, Clone)]
1456 pub struct SortedUnboundedExec {
1457 schema: Schema,
1458 batch_size: u64,
1459 cache: PlanProperties,
1460 }
1461
1462 impl DisplayAs for SortedUnboundedExec {
1463 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1464 match t {
1465 DisplayFormatType::Default
1466 | DisplayFormatType::Verbose
1467 | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1468 }
1469 Ok(())
1470 }
1471 }
1472
1473 impl SortedUnboundedExec {
1474 fn compute_properties(schema: SchemaRef) -> PlanProperties {
1475 let mut eq_properties = EquivalenceProperties::new(schema);
1476 eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1477 Column::new("c1", 0),
1478 ))]);
1479 PlanProperties::new(
1480 eq_properties,
1481 Partitioning::UnknownPartitioning(1),
1482 EmissionType::Final,
1483 Boundedness::Unbounded {
1484 requires_infinite_memory: false,
1485 },
1486 )
1487 }
1488 }
1489
1490 impl ExecutionPlan for SortedUnboundedExec {
1491 fn name(&self) -> &'static str {
1492 Self::static_name()
1493 }
1494
1495 fn as_any(&self) -> &dyn Any {
1496 self
1497 }
1498
1499 fn properties(&self) -> &PlanProperties {
1500 &self.cache
1501 }
1502
1503 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1504 vec![]
1505 }
1506
1507 fn with_new_children(
1508 self: Arc<Self>,
1509 _: Vec<Arc<dyn ExecutionPlan>>,
1510 ) -> Result<Arc<dyn ExecutionPlan>> {
1511 Ok(self)
1512 }
1513
1514 fn execute(
1515 &self,
1516 _partition: usize,
1517 _context: Arc<TaskContext>,
1518 ) -> Result<SendableRecordBatchStream> {
1519 Ok(Box::pin(SortedUnboundedStream {
1520 schema: Arc::new(self.schema.clone()),
1521 batch_size: self.batch_size,
1522 offset: 0,
1523 }))
1524 }
1525 }
1526
1527 #[derive(Debug)]
1528 pub struct SortedUnboundedStream {
1529 schema: SchemaRef,
1530 batch_size: u64,
1531 offset: u64,
1532 }
1533
1534 impl Stream for SortedUnboundedStream {
1535 type Item = Result<RecordBatch>;
1536
1537 fn poll_next(
1538 mut self: Pin<&mut Self>,
1539 _cx: &mut Context<'_>,
1540 ) -> Poll<Option<Self::Item>> {
1541 let batch = SortedUnboundedStream::create_record_batch(
1542 Arc::clone(&self.schema),
1543 self.offset,
1544 self.batch_size,
1545 );
1546 self.offset += self.batch_size;
1547 Poll::Ready(Some(Ok(batch)))
1548 }
1549 }
1550
1551 impl RecordBatchStream for SortedUnboundedStream {
1552 fn schema(&self) -> SchemaRef {
1553 Arc::clone(&self.schema)
1554 }
1555 }
1556
1557 impl SortedUnboundedStream {
1558 fn create_record_batch(
1559 schema: SchemaRef,
1560 offset: u64,
1561 batch_size: u64,
1562 ) -> RecordBatch {
1563 let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1564 let array = UInt64Array::from(values);
1565 let array_ref: ArrayRef = Arc::new(array);
1566 RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1567 }
1568 }
1569
1570 #[tokio::test]
1571 async fn test_in_mem_sort() -> Result<()> {
1572 let task_ctx = Arc::new(TaskContext::default());
1573 let partitions = 4;
1574 let csv = test::scan_partitioned(partitions);
1575 let schema = csv.schema();
1576
1577 let sort_exec = Arc::new(SortExec::new(
1578 [PhysicalSortExpr {
1579 expr: col("i", &schema)?,
1580 options: SortOptions::default(),
1581 }]
1582 .into(),
1583 Arc::new(CoalescePartitionsExec::new(csv)),
1584 ));
1585
1586 let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1587
1588 assert_eq!(result.len(), 1);
1589 assert_eq!(result[0].num_rows(), 400);
1590 assert_eq!(
1591 task_ctx.runtime_env().memory_pool.reserved(),
1592 0,
1593 "The sort should have returned all memory used back to the memory manager"
1594 );
1595
1596 Ok(())
1597 }
1598
1599 #[tokio::test]
1600 async fn test_sort_spill() -> Result<()> {
1601 let session_config = SessionConfig::new();
1603 let sort_spill_reservation_bytes = session_config
1604 .options()
1605 .execution
1606 .sort_spill_reservation_bytes;
1607 let runtime = RuntimeEnvBuilder::new()
1608 .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1609 .build_arc()?;
1610 let task_ctx = Arc::new(
1611 TaskContext::default()
1612 .with_session_config(session_config)
1613 .with_runtime(runtime),
1614 );
1615
1616 let partitions = 100;
1620 let input = test::scan_partitioned(partitions);
1621 let schema = input.schema();
1622
1623 let sort_exec = Arc::new(SortExec::new(
1624 [PhysicalSortExpr {
1625 expr: col("i", &schema)?,
1626 options: SortOptions::default(),
1627 }]
1628 .into(),
1629 Arc::new(CoalescePartitionsExec::new(input)),
1630 ));
1631
1632 let result = collect(
1633 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1634 Arc::clone(&task_ctx),
1635 )
1636 .await?;
1637
1638 assert_eq!(result.len(), 2);
1639
1640 let metrics = sort_exec.metrics().unwrap();
1642
1643 assert_eq!(metrics.output_rows().unwrap(), 10000);
1644 assert!(metrics.elapsed_compute().unwrap() > 0);
1645
1646 let spill_count = metrics.spill_count().unwrap();
1647 let spilled_rows = metrics.spilled_rows().unwrap();
1648 let spilled_bytes = metrics.spilled_bytes().unwrap();
1649 assert!((3..=10).contains(&spill_count));
1653 assert!((9000..=10000).contains(&spilled_rows));
1654 assert!((38000..=44000).contains(&spilled_bytes));
1655
1656 let columns = result[0].columns();
1657
1658 let i = as_primitive_array::<Int32Type>(&columns[0])?;
1659 assert_eq!(i.value(0), 0);
1660 assert_eq!(i.value(i.len() - 1), 81);
1661 assert_eq!(
1662 task_ctx.runtime_env().memory_pool.reserved(),
1663 0,
1664 "The sort should have returned all memory used back to the memory manager"
1665 );
1666
1667 Ok(())
1668 }
1669
1670 #[tokio::test]
1671 async fn test_batch_reservation_error() -> Result<()> {
1672 let merge_reservation: usize = 0; let session_config =
1676 SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1677
1678 let plan = test::scan_partitioned(1);
1679
1680 let expected_batch_reservation = {
1682 let temp_ctx = Arc::new(TaskContext::default());
1683 let mut stream = plan.execute(0, Arc::clone(&temp_ctx))?;
1684 let first_batch = stream.next().await.unwrap()?;
1685 get_reserved_bytes_for_record_batch(&first_batch)?
1686 };
1687
1688 let memory_limit: usize = expected_batch_reservation + merge_reservation - 1;
1690
1691 let runtime = RuntimeEnvBuilder::new()
1692 .with_memory_limit(memory_limit, 1.0)
1693 .build_arc()?;
1694 let task_ctx = Arc::new(
1695 TaskContext::default()
1696 .with_session_config(session_config)
1697 .with_runtime(runtime),
1698 );
1699
1700 {
1702 let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1703 let first_batch = stream.next().await.unwrap()?;
1704 let batch_reservation = get_reserved_bytes_for_record_batch(&first_batch)?;
1705
1706 assert_eq!(batch_reservation, expected_batch_reservation);
1707 assert!(memory_limit < (merge_reservation + batch_reservation));
1708 }
1709
1710 let sort_exec = Arc::new(SortExec::new(
1711 [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1712 plan,
1713 ));
1714
1715 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1716
1717 let err = result.unwrap_err();
1718 assert!(
1719 matches!(err, DataFusionError::Context(..)),
1720 "Assertion failed: expected a Context error, but got: {err:?}"
1721 );
1722
1723 assert!(
1725 matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1726 "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1727 );
1728
1729 Ok(())
1730 }
1731
1732 #[tokio::test]
1733 async fn test_sort_spill_utf8_strings() -> Result<()> {
1734 let session_config = SessionConfig::new()
1735 .with_batch_size(100)
1736 .with_sort_in_place_threshold_bytes(20 * 1024)
1737 .with_sort_spill_reservation_bytes(100 * 1024);
1738 let runtime = RuntimeEnvBuilder::new()
1739 .with_memory_limit(500 * 1024, 1.0)
1740 .build_arc()?;
1741 let task_ctx = Arc::new(
1742 TaskContext::default()
1743 .with_session_config(session_config)
1744 .with_runtime(runtime),
1745 );
1746
1747 let input = test::scan_partitioned_utf8(200);
1751 let schema = input.schema();
1752
1753 let sort_exec = Arc::new(SortExec::new(
1754 [PhysicalSortExpr {
1755 expr: col("i", &schema)?,
1756 options: SortOptions::default(),
1757 }]
1758 .into(),
1759 Arc::new(CoalescePartitionsExec::new(input)),
1760 ));
1761
1762 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1763
1764 let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1765 assert_eq!(num_rows, 20000);
1766
1767 let metrics = sort_exec.metrics().unwrap();
1769
1770 assert_eq!(metrics.output_rows().unwrap(), 20000);
1771 assert!(metrics.elapsed_compute().unwrap() > 0);
1772
1773 let spill_count = metrics.spill_count().unwrap();
1774 let spilled_rows = metrics.spilled_rows().unwrap();
1775 let spilled_bytes = metrics.spilled_bytes().unwrap();
1776
1777 assert!((4..=8).contains(&spill_count));
1791 assert!((15000..=20000).contains(&spilled_rows));
1792 assert!((900000..=1000000).contains(&spilled_bytes));
1793
1794 let concated_result = concat_batches(&schema, &result)?;
1796 let columns = concated_result.columns();
1797 let string_array = as_string_array(&columns[0]);
1798 for i in 0..string_array.len() - 1 {
1799 assert!(string_array.value(i) <= string_array.value(i + 1));
1800 }
1801
1802 assert_eq!(
1803 task_ctx.runtime_env().memory_pool.reserved(),
1804 0,
1805 "The sort should have returned all memory used back to the memory manager"
1806 );
1807
1808 Ok(())
1809 }
1810
1811 #[tokio::test]
1812 async fn test_sort_fetch_memory_calculation() -> Result<()> {
1813 let avg_batch_size = 400;
1815 let partitions = 4;
1816
1817 let test_options = vec![
1819 (None, true),
1822 (Some(1), false),
1825 ];
1826
1827 for (fetch, expect_spillage) in test_options {
1828 let session_config = SessionConfig::new();
1829 let sort_spill_reservation_bytes = session_config
1830 .options()
1831 .execution
1832 .sort_spill_reservation_bytes;
1833
1834 let runtime = RuntimeEnvBuilder::new()
1835 .with_memory_limit(
1836 sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1837 1.0,
1838 )
1839 .build_arc()?;
1840 let task_ctx = Arc::new(
1841 TaskContext::default()
1842 .with_runtime(runtime)
1843 .with_session_config(session_config),
1844 );
1845
1846 let csv = test::scan_partitioned(partitions);
1847 let schema = csv.schema();
1848
1849 let sort_exec = Arc::new(
1850 SortExec::new(
1851 [PhysicalSortExpr {
1852 expr: col("i", &schema)?,
1853 options: SortOptions::default(),
1854 }]
1855 .into(),
1856 Arc::new(CoalescePartitionsExec::new(csv)),
1857 )
1858 .with_fetch(fetch),
1859 );
1860
1861 let result =
1862 collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1863 assert_eq!(result.len(), 1);
1864
1865 let metrics = sort_exec.metrics().unwrap();
1866 let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1867 assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1868 }
1869 Ok(())
1870 }
1871
1872 #[tokio::test]
1873 async fn test_sort_memory_reduction_per_batch() -> Result<()> {
1874 let batch_size = 50; let num_rows = 1000; let task_ctx = Arc::new(
1883 TaskContext::default().with_session_config(
1884 SessionConfig::new()
1885 .with_batch_size(batch_size)
1886 .with_sort_in_place_threshold_bytes(usize::MAX), ),
1888 );
1889
1890 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1891
1892 let mut values: Vec<i32> = (0..num_rows).collect();
1894 values.reverse();
1895
1896 let input_batch = RecordBatch::try_new(
1897 Arc::clone(&schema),
1898 vec![Arc::new(Int32Array::from(values))],
1899 )?;
1900
1901 let batches = vec![input_batch];
1902
1903 let sort_exec = Arc::new(SortExec::new(
1904 [PhysicalSortExpr {
1905 expr: Arc::new(Column::new("a", 0)),
1906 options: SortOptions::default(),
1907 }]
1908 .into(),
1909 TestMemoryExec::try_new_exec(
1910 std::slice::from_ref(&batches),
1911 Arc::clone(&schema),
1912 None,
1913 )?,
1914 ));
1915
1916 let mut stream = sort_exec.execute(0, Arc::clone(&task_ctx))?;
1917
1918 let mut previous_reserved = task_ctx.runtime_env().memory_pool.reserved();
1919 let mut batch_count = 0;
1920
1921 while let Some(result) = stream.next().await {
1923 let batch = result?;
1924 batch_count += 1;
1925
1926 assert!(batch.num_rows() > 0, "Batch should not be empty");
1928
1929 let current_reserved = task_ctx.runtime_env().memory_pool.reserved();
1930
1931 if batch_count > 1 {
1934 assert!(
1935 current_reserved <= previous_reserved,
1936 "Memory reservation should decrease or stay same as batches are emitted. \
1937 Batch {batch_count}: previous={previous_reserved}, current={current_reserved}"
1938 );
1939 }
1940
1941 previous_reserved = current_reserved;
1942 }
1943
1944 assert!(
1945 batch_count > 1,
1946 "Expected multiple batches to be emitted, got {batch_count}"
1947 );
1948
1949 assert_eq!(
1951 task_ctx.runtime_env().memory_pool.reserved(),
1952 0,
1953 "All memory should be returned after consuming all batches"
1954 );
1955
1956 Ok(())
1957 }
1958
1959 #[tokio::test]
1960 async fn test_sort_metadata() -> Result<()> {
1961 let task_ctx = Arc::new(TaskContext::default());
1962 let field_metadata: HashMap<String, String> =
1963 vec![("foo".to_string(), "bar".to_string())]
1964 .into_iter()
1965 .collect();
1966 let schema_metadata: HashMap<String, String> =
1967 vec![("baz".to_string(), "barf".to_string())]
1968 .into_iter()
1969 .collect();
1970
1971 let mut field = Field::new("field_name", DataType::UInt64, true);
1972 field.set_metadata(field_metadata.clone());
1973 let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1974 let schema = Arc::new(schema);
1975
1976 let data: ArrayRef =
1977 Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1978
1979 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1980 let input =
1981 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1982
1983 let sort_exec = Arc::new(SortExec::new(
1984 [PhysicalSortExpr {
1985 expr: col("field_name", &schema)?,
1986 options: SortOptions::default(),
1987 }]
1988 .into(),
1989 input,
1990 ));
1991
1992 let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1993
1994 let expected_data: ArrayRef =
1995 Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1996 let expected_batch =
1997 RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
1998
1999 assert_eq!(&vec![expected_batch], &result);
2001
2002 assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
2004 assert_eq!(result[0].schema().metadata(), &schema_metadata);
2005
2006 Ok(())
2007 }
2008
2009 #[tokio::test]
2010 async fn test_lex_sort_by_mixed_types() -> Result<()> {
2011 let task_ctx = Arc::new(TaskContext::default());
2012 let schema = Arc::new(Schema::new(vec![
2013 Field::new("a", DataType::Int32, true),
2014 Field::new(
2015 "b",
2016 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2017 true,
2018 ),
2019 ]));
2020
2021 let batch = RecordBatch::try_new(
2023 Arc::clone(&schema),
2024 vec![
2025 Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
2026 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2027 Some(vec![Some(3)]),
2028 Some(vec![Some(1)]),
2029 Some(vec![Some(6), None]),
2030 Some(vec![Some(5)]),
2031 ])),
2032 ],
2033 )?;
2034
2035 let sort_exec = Arc::new(SortExec::new(
2036 [
2037 PhysicalSortExpr {
2038 expr: col("a", &schema)?,
2039 options: SortOptions {
2040 descending: false,
2041 nulls_first: true,
2042 },
2043 },
2044 PhysicalSortExpr {
2045 expr: col("b", &schema)?,
2046 options: SortOptions {
2047 descending: true,
2048 nulls_first: false,
2049 },
2050 },
2051 ]
2052 .into(),
2053 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
2054 ));
2055
2056 assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
2057 assert_eq!(
2058 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2059 *sort_exec.schema().field(1).data_type()
2060 );
2061
2062 let result: Vec<RecordBatch> =
2063 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2064 let metrics = sort_exec.metrics().unwrap();
2065 assert!(metrics.elapsed_compute().unwrap() > 0);
2066 assert_eq!(metrics.output_rows().unwrap(), 4);
2067 assert_eq!(result.len(), 1);
2068
2069 let expected = RecordBatch::try_new(
2070 schema,
2071 vec![
2072 Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
2073 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2074 Some(vec![Some(1)]),
2075 Some(vec![Some(6), None]),
2076 Some(vec![Some(5)]),
2077 Some(vec![Some(3)]),
2078 ])),
2079 ],
2080 )?;
2081
2082 assert_eq!(expected, result[0]);
2083
2084 Ok(())
2085 }
2086
2087 #[tokio::test]
2088 async fn test_lex_sort_by_float() -> Result<()> {
2089 let task_ctx = Arc::new(TaskContext::default());
2090 let schema = Arc::new(Schema::new(vec![
2091 Field::new("a", DataType::Float32, true),
2092 Field::new("b", DataType::Float64, true),
2093 ]));
2094
2095 let batch = RecordBatch::try_new(
2097 Arc::clone(&schema),
2098 vec![
2099 Arc::new(Float32Array::from(vec![
2100 Some(f32::NAN),
2101 None,
2102 None,
2103 Some(f32::NAN),
2104 Some(1.0_f32),
2105 Some(1.0_f32),
2106 Some(2.0_f32),
2107 Some(3.0_f32),
2108 ])),
2109 Arc::new(Float64Array::from(vec![
2110 Some(200.0_f64),
2111 Some(20.0_f64),
2112 Some(10.0_f64),
2113 Some(100.0_f64),
2114 Some(f64::NAN),
2115 None,
2116 None,
2117 Some(f64::NAN),
2118 ])),
2119 ],
2120 )?;
2121
2122 let sort_exec = Arc::new(SortExec::new(
2123 [
2124 PhysicalSortExpr {
2125 expr: col("a", &schema)?,
2126 options: SortOptions {
2127 descending: true,
2128 nulls_first: true,
2129 },
2130 },
2131 PhysicalSortExpr {
2132 expr: col("b", &schema)?,
2133 options: SortOptions {
2134 descending: false,
2135 nulls_first: false,
2136 },
2137 },
2138 ]
2139 .into(),
2140 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
2141 ));
2142
2143 assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
2144 assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
2145
2146 let result: Vec<RecordBatch> =
2147 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2148 let metrics = sort_exec.metrics().unwrap();
2149 assert!(metrics.elapsed_compute().unwrap() > 0);
2150 assert_eq!(metrics.output_rows().unwrap(), 8);
2151 assert_eq!(result.len(), 1);
2152
2153 let columns = result[0].columns();
2154
2155 assert_eq!(DataType::Float32, *columns[0].data_type());
2156 assert_eq!(DataType::Float64, *columns[1].data_type());
2157
2158 let a = as_primitive_array::<Float32Type>(&columns[0])?;
2159 let b = as_primitive_array::<Float64Type>(&columns[1])?;
2160
2161 let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2163 .map(|i| {
2164 let aval = if a.is_valid(i) {
2165 Some(a.value(i).to_string())
2166 } else {
2167 None
2168 };
2169 let bval = if b.is_valid(i) {
2170 Some(b.value(i).to_string())
2171 } else {
2172 None
2173 };
2174 (aval, bval)
2175 })
2176 .collect();
2177
2178 let expected: Vec<(Option<String>, Option<String>)> = vec![
2179 (None, Some("10".to_owned())),
2180 (None, Some("20".to_owned())),
2181 (Some("NaN".to_owned()), Some("100".to_owned())),
2182 (Some("NaN".to_owned()), Some("200".to_owned())),
2183 (Some("3".to_owned()), Some("NaN".to_owned())),
2184 (Some("2".to_owned()), None),
2185 (Some("1".to_owned()), Some("NaN".to_owned())),
2186 (Some("1".to_owned()), None),
2187 ];
2188
2189 assert_eq!(expected, result);
2190
2191 Ok(())
2192 }
2193
2194 #[tokio::test]
2195 async fn test_drop_cancel() -> Result<()> {
2196 let task_ctx = Arc::new(TaskContext::default());
2197 let schema =
2198 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2199
2200 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2201 let refs = blocking_exec.refs();
2202 let sort_exec = Arc::new(SortExec::new(
2203 [PhysicalSortExpr {
2204 expr: col("a", &schema)?,
2205 options: SortOptions::default(),
2206 }]
2207 .into(),
2208 blocking_exec,
2209 ));
2210
2211 let fut = collect(sort_exec, Arc::clone(&task_ctx));
2212 let mut fut = fut.boxed();
2213
2214 assert_is_pending(&mut fut);
2215 drop(fut);
2216 assert_strong_count_converges_to_zero(refs).await;
2217
2218 assert_eq!(
2219 task_ctx.runtime_env().memory_pool.reserved(),
2220 0,
2221 "The sort should have returned all memory used back to the memory manager"
2222 );
2223
2224 Ok(())
2225 }
2226
2227 #[test]
2228 fn test_empty_sort_batch() {
2229 let schema = Arc::new(Schema::empty());
2230 let options = RecordBatchOptions::new().with_row_count(Some(1));
2231 let batch =
2232 RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2233 .unwrap();
2234
2235 let expressions = [PhysicalSortExpr {
2236 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2237 options: SortOptions::default(),
2238 }]
2239 .into();
2240
2241 let result = sort_batch(&batch, &expressions, None).unwrap();
2242 assert_eq!(result.num_rows(), 1);
2243 }
2244
2245 #[tokio::test]
2246 async fn topk_unbounded_source() -> Result<()> {
2247 let task_ctx = Arc::new(TaskContext::default());
2248 let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2249 let source = SortedUnboundedExec {
2250 schema: schema.clone(),
2251 batch_size: 2,
2252 cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
2253 };
2254 let mut plan = SortExec::new(
2255 [PhysicalSortExpr::new_default(Arc::new(Column::new(
2256 "c1", 0,
2257 )))]
2258 .into(),
2259 Arc::new(source),
2260 );
2261 plan = plan.with_fetch(Some(9));
2262
2263 let batches = collect(Arc::new(plan), task_ctx).await?;
2264 assert_snapshot!(batches_to_string(&batches), @r"
2265 +----+
2266 | c1 |
2267 +----+
2268 | 0 |
2269 | 1 |
2270 | 2 |
2271 | 3 |
2272 | 4 |
2273 | 5 |
2274 | 6 |
2275 | 7 |
2276 | 8 |
2277 +----+
2278 ");
2279 Ok(())
2280 }
2281
2282 #[tokio::test]
2283 async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2284 let batch_size = 100;
2285
2286 let create_task_ctx = |_: &[RecordBatch]| {
2287 TaskContext::default().with_session_config(
2288 SessionConfig::new()
2289 .with_batch_size(batch_size)
2290 .with_sort_in_place_threshold_bytes(usize::MAX),
2291 )
2292 };
2293
2294 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2296
2297 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2299
2300 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2302
2303 Ok(())
2304 }
2305
2306 #[tokio::test]
2307 async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place()
2308 -> Result<()> {
2309 let batch_size = 100;
2310
2311 let create_task_ctx = |_: &[RecordBatch]| {
2312 TaskContext::default().with_session_config(
2313 SessionConfig::new()
2314 .with_batch_size(batch_size)
2315 .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2316 )
2317 };
2318
2319 {
2321 let metrics =
2322 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2323
2324 assert_eq!(
2325 metrics.spill_count(),
2326 Some(0),
2327 "Expected no spills when sorting in place"
2328 );
2329 }
2330
2331 {
2333 let metrics =
2334 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2335
2336 assert_eq!(
2337 metrics.spill_count(),
2338 Some(0),
2339 "Expected no spills when sorting in place"
2340 );
2341 }
2342
2343 {
2345 let metrics =
2346 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2347
2348 assert_eq!(
2349 metrics.spill_count(),
2350 Some(0),
2351 "Expected no spills when sorting in place"
2352 );
2353 }
2354
2355 Ok(())
2356 }
2357
2358 #[tokio::test]
2359 async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch()
2360 -> Result<()> {
2361 let batch_size = 100;
2362
2363 let create_task_ctx = |_: &[RecordBatch]| {
2364 TaskContext::default()
2365 .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2366 };
2367
2368 {
2370 let metrics = test_sort_output_batch_size(
2371 1,
2373 batch_size / 4,
2374 create_task_ctx,
2375 )
2376 .await?;
2377
2378 assert_eq!(
2379 metrics.spill_count(),
2380 Some(0),
2381 "Expected no spills when sorting in place"
2382 );
2383 }
2384
2385 {
2387 let metrics = test_sort_output_batch_size(
2388 1,
2390 batch_size + 7,
2391 create_task_ctx,
2392 )
2393 .await?;
2394
2395 assert_eq!(
2396 metrics.spill_count(),
2397 Some(0),
2398 "Expected no spills when sorting in place"
2399 );
2400 }
2401
2402 {
2404 let metrics = test_sort_output_batch_size(
2405 1,
2407 batch_size * 3,
2408 create_task_ctx,
2409 )
2410 .await?;
2411
2412 assert_eq!(
2413 metrics.spill_count(),
2414 Some(0),
2415 "Expected no spills when sorting in place"
2416 );
2417 }
2418
2419 Ok(())
2420 }
2421
2422 #[tokio::test]
2423 async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill()
2424 -> Result<()> {
2425 let batch_size = 100;
2426
2427 let create_task_ctx = |generated_batches: &[RecordBatch]| {
2428 let batches_memory = generated_batches
2429 .iter()
2430 .map(|b| b.get_array_memory_size())
2431 .sum::<usize>();
2432
2433 TaskContext::default()
2434 .with_session_config(
2435 SessionConfig::new()
2436 .with_batch_size(batch_size)
2437 .with_sort_in_place_threshold_bytes(1)
2439 .with_sort_spill_reservation_bytes(1),
2440 )
2441 .with_runtime(
2442 RuntimeEnvBuilder::default()
2443 .with_memory_limit(batches_memory, 1.0)
2444 .build_arc()
2445 .unwrap(),
2446 )
2447 };
2448
2449 {
2451 let metrics =
2452 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2453
2454 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2455 }
2456
2457 {
2459 let metrics =
2460 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2461
2462 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2463 }
2464
2465 {
2467 let metrics =
2468 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2469
2470 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2471 }
2472
2473 Ok(())
2474 }
2475
2476 async fn test_sort_output_batch_size(
2477 number_of_batches: usize,
2478 batch_size_to_generate: usize,
2479 create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2480 ) -> Result<MetricsSet> {
2481 let batches = (0..number_of_batches)
2482 .map(|_| make_partition(batch_size_to_generate as i32))
2483 .collect::<Vec<_>>();
2484 let task_ctx = create_task_ctx(batches.as_slice());
2485
2486 let expected_batch_size = task_ctx.session_config().batch_size();
2487
2488 let (mut output_batches, metrics) =
2489 run_sort_on_input(task_ctx, "i", batches).await?;
2490
2491 let last_batch = output_batches.pop().unwrap();
2492
2493 for batch in output_batches {
2494 assert_eq!(batch.num_rows(), expected_batch_size);
2495 }
2496
2497 let mut last_expected_batch_size =
2498 (batch_size_to_generate * number_of_batches) % expected_batch_size;
2499 if last_expected_batch_size == 0 {
2500 last_expected_batch_size = expected_batch_size;
2501 }
2502 assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2503
2504 Ok(metrics)
2505 }
2506
2507 async fn run_sort_on_input(
2508 task_ctx: TaskContext,
2509 order_by_col: &str,
2510 batches: Vec<RecordBatch>,
2511 ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2512 let task_ctx = Arc::new(task_ctx);
2513
2514 let schema = batches[0].schema();
2516 let ordering: LexOrdering = [PhysicalSortExpr {
2517 expr: col(order_by_col, &schema)?,
2518 options: SortOptions {
2519 descending: false,
2520 nulls_first: true,
2521 },
2522 }]
2523 .into();
2524 let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2525 ordering.clone(),
2526 TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2527 ));
2528
2529 let sorted_batches =
2530 collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2531
2532 let metrics = sort_exec.metrics().expect("sort have metrics");
2533
2534 {
2536 let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2537 let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2538
2539 let sorted_batches_concat =
2540 concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2541
2542 assert_eq!(sorted_input_batch, sorted_batches_concat);
2543 }
2544
2545 Ok((sorted_batches, metrics))
2546 }
2547
2548 #[tokio::test]
2549 async fn test_sort_batch_chunked_basic() -> Result<()> {
2550 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2551
2552 let mut values: Vec<i32> = (0..1000).collect();
2554 values.reverse();
2556
2557 let batch = RecordBatch::try_new(
2558 Arc::clone(&schema),
2559 vec![Arc::new(Int32Array::from(values))],
2560 )?;
2561
2562 let expressions: LexOrdering =
2563 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2564
2565 let result_batches = sort_batch_chunked(&batch, &expressions, 250)?;
2567
2568 assert_eq!(result_batches.len(), 4);
2570
2571 let mut total_rows = 0;
2573 for (i, batch) in result_batches.iter().enumerate() {
2574 assert!(
2575 batch.num_rows() <= 250,
2576 "Batch {} has {} rows, expected <= 250",
2577 i,
2578 batch.num_rows()
2579 );
2580 total_rows += batch.num_rows();
2581 }
2582
2583 assert_eq!(total_rows, 1000);
2585
2586 let concatenated = concat_batches(&schema, &result_batches)?;
2588 let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2589 for i in 0..array.len() - 1 {
2590 assert!(
2591 array.value(i) <= array.value(i + 1),
2592 "Array not sorted at position {}: {} > {}",
2593 i,
2594 array.value(i),
2595 array.value(i + 1)
2596 );
2597 }
2598 assert_eq!(array.value(0), 0);
2599 assert_eq!(array.value(array.len() - 1), 999);
2600
2601 Ok(())
2602 }
2603
2604 #[tokio::test]
2605 async fn test_sort_batch_chunked_smaller_than_batch_size() -> Result<()> {
2606 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2607
2608 let values: Vec<i32> = (0..50).rev().collect();
2610 let batch = RecordBatch::try_new(
2611 Arc::clone(&schema),
2612 vec![Arc::new(Int32Array::from(values))],
2613 )?;
2614
2615 let expressions: LexOrdering =
2616 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2617
2618 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2620
2621 assert_eq!(result_batches.len(), 1);
2623 assert_eq!(result_batches[0].num_rows(), 50);
2624
2625 let array = as_primitive_array::<Int32Type>(result_batches[0].column(0))?;
2627 for i in 0..array.len() - 1 {
2628 assert!(array.value(i) <= array.value(i + 1));
2629 }
2630 assert_eq!(array.value(0), 0);
2631 assert_eq!(array.value(49), 49);
2632
2633 Ok(())
2634 }
2635
2636 #[tokio::test]
2637 async fn test_sort_batch_chunked_exact_multiple() -> Result<()> {
2638 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2639
2640 let values: Vec<i32> = (0..1000).rev().collect();
2642 let batch = RecordBatch::try_new(
2643 Arc::clone(&schema),
2644 vec![Arc::new(Int32Array::from(values))],
2645 )?;
2646
2647 let expressions: LexOrdering =
2648 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2649
2650 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2652
2653 assert_eq!(result_batches.len(), 10);
2655 for batch in &result_batches {
2656 assert_eq!(batch.num_rows(), 100);
2657 }
2658
2659 let concatenated = concat_batches(&schema, &result_batches)?;
2661 let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2662 for i in 0..array.len() - 1 {
2663 assert!(array.value(i) <= array.value(i + 1));
2664 }
2665
2666 Ok(())
2667 }
2668
2669 #[tokio::test]
2670 async fn test_sort_batch_chunked_empty_batch() -> Result<()> {
2671 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2672
2673 let batch = RecordBatch::new_empty(Arc::clone(&schema));
2674
2675 let expressions: LexOrdering =
2676 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2677
2678 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2679
2680 assert_eq!(result_batches.len(), 0);
2682
2683 Ok(())
2684 }
2685
2686 #[tokio::test]
2687 async fn test_get_reserved_bytes_for_record_batch_with_sliced_batches() -> Result<()>
2688 {
2689 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2690
2691 let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
2693 let sliced_array = large_array.slice(100, 50); let sliced_batch =
2696 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(sliced_array)])?;
2697 let batch =
2698 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(large_array)])?;
2699
2700 let sliced_reserved = get_reserved_bytes_for_record_batch(&sliced_batch)?;
2701 let reserved = get_reserved_bytes_for_record_batch(&batch)?;
2702
2703 assert!(reserved > sliced_reserved);
2705
2706 Ok(())
2707 }
2708}