1use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
31use crate::expressions::PhysicalSortExpr;
32use crate::filter_pushdown::{
33 ChildFilterDescription, FilterDescription, FilterPushdownPhase,
34};
35use crate::limit::LimitStream;
36use crate::metrics::{
37 BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, SplitMetrics,
38};
39use crate::projection::{make_with_child, update_ordering, ProjectionExec};
40use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
41use crate::spill::get_record_batch_memory_size;
42use crate::spill::in_progress_spill_file::InProgressSpillFile;
43use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
44use crate::stream::BatchSplitStream;
45use crate::stream::RecordBatchStreamAdapter;
46use crate::topk::TopK;
47use crate::topk::TopKDynamicFilters;
48use crate::{
49 DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
50 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
51 Statistics,
52};
53
54use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
55use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
56use arrow::datatypes::SchemaRef;
57use datafusion_common::config::SpillCompression;
58use datafusion_common::{
59 internal_datafusion_err, internal_err, unwrap_or_internal_err, DataFusionError,
60 Result,
61};
62use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
63use datafusion_execution::runtime_env::RuntimeEnv;
64use datafusion_execution::TaskContext;
65use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
66use datafusion_physical_expr::LexOrdering;
67use datafusion_physical_expr::PhysicalExpr;
68
69use futures::{StreamExt, TryStreamExt};
70use log::{debug, trace};
71
72struct ExternalSorterMetrics {
73 baseline: BaselineMetrics,
75
76 spill_metrics: SpillMetrics,
77
78 split_metrics: SplitMetrics,
79}
80
81impl ExternalSorterMetrics {
82 fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
83 Self {
84 baseline: BaselineMetrics::new(metrics, partition),
85 spill_metrics: SpillMetrics::new(metrics, partition),
86 split_metrics: SplitMetrics::new(metrics, partition),
87 }
88 }
89}
90
91struct ExternalSorter {
209 schema: SchemaRef,
215 expr: LexOrdering,
217 batch_size: usize,
219 sort_in_place_threshold_bytes: usize,
223
224 in_mem_batches: Vec<RecordBatch>,
230
231 in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
238 finished_spill_files: Vec<SortedSpillFile>,
243
244 metrics: ExternalSorterMetrics,
250 runtime: Arc<RuntimeEnv>,
252 reservation: MemoryReservation,
254 spill_manager: SpillManager,
255
256 merge_reservation: MemoryReservation,
260 sort_spill_reservation_bytes: usize,
263}
264
265impl ExternalSorter {
266 #[allow(clippy::too_many_arguments)]
269 pub fn new(
270 partition_id: usize,
271 schema: SchemaRef,
272 expr: LexOrdering,
273 batch_size: usize,
274 sort_spill_reservation_bytes: usize,
275 sort_in_place_threshold_bytes: usize,
276 spill_compression: SpillCompression,
278 metrics: &ExecutionPlanMetricsSet,
279 runtime: Arc<RuntimeEnv>,
280 ) -> Result<Self> {
281 let metrics = ExternalSorterMetrics::new(metrics, partition_id);
282 let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
283 .with_can_spill(true)
284 .register(&runtime.memory_pool);
285
286 let merge_reservation =
287 MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
288 .register(&runtime.memory_pool);
289
290 let spill_manager = SpillManager::new(
291 Arc::clone(&runtime),
292 metrics.spill_metrics.clone(),
293 Arc::clone(&schema),
294 )
295 .with_compression_type(spill_compression);
296
297 Ok(Self {
298 schema,
299 in_mem_batches: vec![],
300 in_progress_spill_file: None,
301 finished_spill_files: vec![],
302 expr,
303 metrics,
304 reservation,
305 spill_manager,
306 merge_reservation,
307 runtime,
308 batch_size,
309 sort_spill_reservation_bytes,
310 sort_in_place_threshold_bytes,
311 })
312 }
313
314 async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
318 if input.num_rows() == 0 {
319 return Ok(());
320 }
321
322 self.reserve_memory_for_merge()?;
323 self.reserve_memory_for_batch_and_maybe_spill(&input)
324 .await?;
325
326 self.in_mem_batches.push(input);
327 Ok(())
328 }
329
330 fn spilled_before(&self) -> bool {
331 !self.finished_spill_files.is_empty()
332 }
333
334 async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
344 self.merge_reservation.free();
348
349 if self.spilled_before() {
350 if !self.in_mem_batches.is_empty() {
354 self.sort_and_spill_in_mem_batches().await?;
355 }
356
357 StreamingMergeBuilder::new()
358 .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
359 .with_spill_manager(self.spill_manager.clone())
360 .with_schema(Arc::clone(&self.schema))
361 .with_expressions(&self.expr.clone())
362 .with_metrics(self.metrics.baseline.clone())
363 .with_batch_size(self.batch_size)
364 .with_fetch(None)
365 .with_reservation(self.merge_reservation.new_empty())
366 .build()
367 } else {
368 self.in_mem_sort_stream(self.metrics.baseline.clone())
369 }
370 }
371
372 fn used(&self) -> usize {
374 self.reservation.size()
375 }
376
377 fn spilled_bytes(&self) -> usize {
379 self.metrics.spill_metrics.spilled_bytes.value()
380 }
381
382 fn spilled_rows(&self) -> usize {
384 self.metrics.spill_metrics.spilled_rows.value()
385 }
386
387 fn spill_count(&self) -> usize {
389 self.metrics.spill_metrics.spill_file_count.value()
390 }
391
392 async fn consume_and_spill_append(
395 &mut self,
396 globally_sorted_batches: &mut Vec<RecordBatch>,
397 ) -> Result<()> {
398 if globally_sorted_batches.is_empty() {
399 return Ok(());
400 }
401
402 if self.in_progress_spill_file.is_none() {
404 self.in_progress_spill_file =
405 Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
406 }
407
408 Self::organize_stringview_arrays(globally_sorted_batches)?;
409
410 debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
411
412 let batches_to_spill = std::mem::take(globally_sorted_batches);
413 self.reservation.free();
414
415 let (in_progress_file, max_record_batch_size) =
416 self.in_progress_spill_file.as_mut().ok_or_else(|| {
417 internal_datafusion_err!("In-progress spill file should be initialized")
418 })?;
419
420 for batch in batches_to_spill {
421 in_progress_file.append_batch(&batch)?;
422
423 *max_record_batch_size =
424 (*max_record_batch_size).max(batch.get_sliced_size()?);
425 }
426
427 if !globally_sorted_batches.is_empty() {
428 return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking.");
429 }
430
431 Ok(())
432 }
433
434 async fn spill_finish(&mut self) -> Result<()> {
436 let (mut in_progress_file, max_record_batch_memory) =
437 self.in_progress_spill_file.take().ok_or_else(|| {
438 internal_datafusion_err!("Should be called after `spill_append`")
439 })?;
440 let spill_file = in_progress_file.finish()?;
441
442 if let Some(spill_file) = spill_file {
443 self.finished_spill_files.push(SortedSpillFile {
444 file: spill_file,
445 max_record_batch_memory,
446 });
447 }
448
449 Ok(())
450 }
451
452 fn organize_stringview_arrays(
482 globally_sorted_batches: &mut Vec<RecordBatch>,
483 ) -> Result<()> {
484 let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
485
486 for batch in globally_sorted_batches.drain(..) {
487 let mut new_columns: Vec<Arc<dyn Array>> =
488 Vec::with_capacity(batch.num_columns());
489
490 let mut arr_mutated = false;
491 for array in batch.columns() {
492 if let Some(string_view_array) =
493 array.as_any().downcast_ref::<StringViewArray>()
494 {
495 let new_array = string_view_array.gc();
496 new_columns.push(Arc::new(new_array));
497 arr_mutated = true;
498 } else {
499 new_columns.push(Arc::clone(array));
500 }
501 }
502
503 let organized_batch = if arr_mutated {
504 RecordBatch::try_new(batch.schema(), new_columns)?
505 } else {
506 batch
507 };
508
509 organized_batches.push(organized_batch);
510 }
511
512 *globally_sorted_batches = organized_batches;
513
514 Ok(())
515 }
516
517 async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
520 if self.in_mem_batches.is_empty() {
521 return internal_err!(
522 "in_mem_batches must not be empty when attempting to sort and spill"
523 );
524 }
525
526 self.merge_reservation.free();
531
532 let mut sorted_stream =
533 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
534 if !self.in_mem_batches.is_empty() {
537 return internal_err!(
538 "in_mem_batches should be empty after constructing sorted stream"
539 );
540 }
541 let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
545
546 while let Some(batch) = sorted_stream.next().await {
547 let batch = batch?;
548 let sorted_size = get_reserved_byte_for_record_batch(&batch);
549 if self.reservation.try_grow(sorted_size).is_err() {
550 globally_sorted_batches.push(batch);
554 self.consume_and_spill_append(&mut globally_sorted_batches)
555 .await?; } else {
557 globally_sorted_batches.push(batch);
558 }
559 }
560
561 drop(sorted_stream);
564
565 self.consume_and_spill_append(&mut globally_sorted_batches)
566 .await?;
567 self.spill_finish().await?;
568
569 let buffers_cleared_property =
571 self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
572 if !buffers_cleared_property {
573 return internal_err!(
574 "in_mem_batches and globally_sorted_batches should be cleared before"
575 );
576 }
577
578 self.reserve_memory_for_merge()?;
580
581 Ok(())
582 }
583
584 fn in_mem_sort_stream(
643 &mut self,
644 metrics: BaselineMetrics,
645 ) -> Result<SendableRecordBatchStream> {
646 if self.in_mem_batches.is_empty() {
647 return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
648 &self.schema,
649 ))));
650 }
651
652 let elapsed_compute = metrics.elapsed_compute().clone();
655 let _timer = elapsed_compute.timer();
656
657 if self.in_mem_batches.len() == 1 {
664 let batch = self.in_mem_batches.swap_remove(0);
665 let reservation = self.reservation.take();
666 return self.sort_batch_stream(batch, metrics, reservation, true);
667 }
668
669 if self.reservation.size() < self.sort_in_place_threshold_bytes {
671 let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
673 self.in_mem_batches.clear();
674 self.reservation
675 .try_resize(get_reserved_byte_for_record_batch(&batch))
676 .map_err(Self::err_with_oom_context)?;
677 let reservation = self.reservation.take();
678 return self.sort_batch_stream(batch, metrics, reservation, true);
679 }
680
681 let streams = std::mem::take(&mut self.in_mem_batches)
682 .into_iter()
683 .map(|batch| {
684 let metrics = self.metrics.baseline.intermediate();
685 let reservation = self
686 .reservation
687 .split(get_reserved_byte_for_record_batch(&batch));
688 let input = self.sort_batch_stream(
689 batch,
690 metrics,
691 reservation,
692 false,
695 )?;
696 Ok(spawn_buffered(input, 1))
697 })
698 .collect::<Result<_>>()?;
699
700 StreamingMergeBuilder::new()
701 .with_streams(streams)
702 .with_schema(Arc::clone(&self.schema))
703 .with_expressions(&self.expr.clone())
704 .with_metrics(metrics)
705 .with_batch_size(self.batch_size)
706 .with_fetch(None)
707 .with_reservation(self.merge_reservation.new_empty())
708 .build()
709 }
710
711 fn sort_batch_stream(
720 &self,
721 batch: RecordBatch,
722 metrics: BaselineMetrics,
723 reservation: MemoryReservation,
724 mut split: bool,
725 ) -> Result<SendableRecordBatchStream> {
726 assert_eq!(
727 get_reserved_byte_for_record_batch(&batch),
728 reservation.size()
729 );
730
731 split = split && batch.num_rows() > self.batch_size;
732
733 let schema = batch.schema();
734
735 let expressions = self.expr.clone();
736 let stream = futures::stream::once(async move {
737 let _timer = metrics.elapsed_compute().timer();
738
739 let sorted = sort_batch(&batch, &expressions, None)?;
740
741 metrics.record_output(sorted.num_rows());
742 drop(batch);
743 drop(reservation);
744 Ok(sorted)
745 });
746
747 let mut output: SendableRecordBatchStream =
748 Box::pin(RecordBatchStreamAdapter::new(schema, stream));
749
750 if split {
751 output = Box::pin(BatchSplitStream::new(
752 output,
753 self.batch_size,
754 self.metrics.split_metrics.clone(),
755 ));
756 }
757
758 Ok(output)
759 }
760
761 fn reserve_memory_for_merge(&mut self) -> Result<()> {
765 if self.runtime.disk_manager.tmp_files_enabled() {
767 let size = self.sort_spill_reservation_bytes;
768 if self.merge_reservation.size() != size {
769 self.merge_reservation
770 .try_resize(size)
771 .map_err(Self::err_with_oom_context)?;
772 }
773 }
774
775 Ok(())
776 }
777
778 async fn reserve_memory_for_batch_and_maybe_spill(
781 &mut self,
782 input: &RecordBatch,
783 ) -> Result<()> {
784 let size = get_reserved_byte_for_record_batch(input);
785
786 match self.reservation.try_grow(size) {
787 Ok(_) => Ok(()),
788 Err(e) => {
789 if self.in_mem_batches.is_empty() {
790 return Err(Self::err_with_oom_context(e));
791 }
792
793 self.sort_and_spill_in_mem_batches().await?;
795 self.reservation
796 .try_grow(size)
797 .map_err(Self::err_with_oom_context)
798 }
799 }
800 }
801
802 fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
805 match e {
806 DataFusionError::ResourcesExhausted(_) => e.context(
807 "Not enough memory to continue external sort. \
808 Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
809 ),
810 _ => e,
812 }
813 }
814}
815
816pub(crate) fn get_reserved_byte_for_record_batch_size(record_batch_size: usize) -> usize {
824 record_batch_size * 2
828}
829
830fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
832 get_reserved_byte_for_record_batch_size(get_record_batch_memory_size(batch))
833}
834
835impl Debug for ExternalSorter {
836 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
837 f.debug_struct("ExternalSorter")
838 .field("memory_used", &self.used())
839 .field("spilled_bytes", &self.spilled_bytes())
840 .field("spilled_rows", &self.spilled_rows())
841 .field("spill_count", &self.spill_count())
842 .finish()
843 }
844}
845
846pub fn sort_batch(
847 batch: &RecordBatch,
848 expressions: &LexOrdering,
849 fetch: Option<usize>,
850) -> Result<RecordBatch> {
851 let sort_columns = expressions
852 .iter()
853 .map(|expr| expr.evaluate_to_sort_column(batch))
854 .collect::<Result<Vec<_>>>()?;
855
856 let indices = lexsort_to_indices(&sort_columns, fetch)?;
857 let mut columns = take_arrays(batch.columns(), &indices, None)?;
858
859 columns.iter_mut().for_each(|c| {
864 c.shrink_to_fit();
865 });
866
867 let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
868 Ok(RecordBatch::try_new_with_options(
869 batch.schema(),
870 columns,
871 &options,
872 )?)
873}
874
875#[derive(Debug, Clone)]
880pub struct SortExec {
881 pub(crate) input: Arc<dyn ExecutionPlan>,
883 expr: LexOrdering,
885 metrics_set: ExecutionPlanMetricsSet,
887 preserve_partitioning: bool,
890 fetch: Option<usize>,
892 common_sort_prefix: Vec<PhysicalSortExpr>,
894 cache: PlanProperties,
896 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
900}
901
902impl SortExec {
903 pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
906 let preserve_partitioning = false;
907 let (cache, sort_prefix) =
908 Self::compute_properties(&input, expr.clone(), preserve_partitioning)
909 .unwrap();
910 Self {
911 expr,
912 input,
913 metrics_set: ExecutionPlanMetricsSet::new(),
914 preserve_partitioning,
915 fetch: None,
916 common_sort_prefix: sort_prefix,
917 cache,
918 filter: None,
919 }
920 }
921
922 pub fn preserve_partitioning(&self) -> bool {
924 self.preserve_partitioning
925 }
926
927 pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
935 self.preserve_partitioning = preserve_partitioning;
936 self.cache = self
937 .cache
938 .with_partitioning(Self::output_partitioning_helper(
939 &self.input,
940 self.preserve_partitioning,
941 ));
942 self
943 }
944
945 fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
947 let children = self
948 .expr
949 .iter()
950 .map(|sort_expr| Arc::clone(&sort_expr.expr))
951 .collect::<Vec<_>>();
952 Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
953 DynamicFilterPhysicalExpr::new(children, lit(true)),
954 ))))
955 }
956
957 fn cloned(&self) -> Self {
958 SortExec {
959 input: Arc::clone(&self.input),
960 expr: self.expr.clone(),
961 metrics_set: self.metrics_set.clone(),
962 preserve_partitioning: self.preserve_partitioning,
963 common_sort_prefix: self.common_sort_prefix.clone(),
964 fetch: self.fetch,
965 cache: self.cache.clone(),
966 filter: self.filter.clone(),
967 }
968 }
969
970 pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
978 let mut cache = self.cache.clone();
979 let is_pipeline_friendly = matches!(
983 self.cache.emission_type,
984 EmissionType::Incremental | EmissionType::Both
985 );
986 if fetch.is_some() && is_pipeline_friendly {
987 cache = cache.with_boundedness(Boundedness::Bounded);
988 }
989 let filter = fetch.is_some().then(|| {
990 self.filter.clone().unwrap_or_else(|| self.create_filter())
992 });
993 let mut new_sort = self.cloned();
994 new_sort.fetch = fetch;
995 new_sort.cache = cache;
996 new_sort.filter = filter;
997 new_sort
998 }
999
1000 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1002 &self.input
1003 }
1004
1005 pub fn expr(&self) -> &LexOrdering {
1007 &self.expr
1008 }
1009
1010 pub fn fetch(&self) -> Option<usize> {
1012 self.fetch
1013 }
1014
1015 fn output_partitioning_helper(
1016 input: &Arc<dyn ExecutionPlan>,
1017 preserve_partitioning: bool,
1018 ) -> Partitioning {
1019 if preserve_partitioning {
1021 input.output_partitioning().clone()
1022 } else {
1023 Partitioning::UnknownPartitioning(1)
1024 }
1025 }
1026
1027 fn compute_properties(
1030 input: &Arc<dyn ExecutionPlan>,
1031 sort_exprs: LexOrdering,
1032 preserve_partitioning: bool,
1033 ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1034 let (sort_prefix, sort_satisfied) = input
1035 .equivalence_properties()
1036 .extract_common_sort_prefix(sort_exprs.clone())?;
1037
1038 let emission_type = if sort_satisfied {
1042 input.pipeline_behavior()
1043 } else {
1044 EmissionType::Final
1045 };
1046
1047 let boundedness = if sort_satisfied {
1053 input.boundedness()
1054 } else {
1055 match input.boundedness() {
1056 Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1057 requires_infinite_memory: true,
1058 },
1059 bounded => bounded,
1060 }
1061 };
1062
1063 let mut eq_properties = input.equivalence_properties().clone();
1066 eq_properties.reorder(sort_exprs)?;
1067
1068 let output_partitioning =
1070 Self::output_partitioning_helper(input, preserve_partitioning);
1071
1072 Ok((
1073 PlanProperties::new(
1074 eq_properties,
1075 output_partitioning,
1076 emission_type,
1077 boundedness,
1078 ),
1079 sort_prefix,
1080 ))
1081 }
1082}
1083
1084impl DisplayAs for SortExec {
1085 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1086 match t {
1087 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1088 let preserve_partitioning = self.preserve_partitioning;
1089 match self.fetch {
1090 Some(fetch) => {
1091 write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
1092 if let Some(filter) = &self.filter {
1093 if let Ok(current) = filter.read().expr().current() {
1094 if !current.eq(&lit(true)) {
1095 write!(f, ", filter=[{current}]")?;
1096 }
1097 }
1098 }
1099 if !self.common_sort_prefix.is_empty() {
1100 write!(f, ", sort_prefix=[")?;
1101 let mut first = true;
1102 for sort_expr in &self.common_sort_prefix {
1103 if first {
1104 first = false;
1105 } else {
1106 write!(f, ", ")?;
1107 }
1108 write!(f, "{sort_expr}")?;
1109 }
1110 write!(f, "]")
1111 } else {
1112 Ok(())
1113 }
1114 }
1115 None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
1116 }
1117 }
1118 DisplayFormatType::TreeRender => match self.fetch {
1119 Some(fetch) => {
1120 writeln!(f, "{}", self.expr)?;
1121 writeln!(f, "limit={fetch}")
1122 }
1123 None => {
1124 writeln!(f, "{}", self.expr)
1125 }
1126 },
1127 }
1128 }
1129}
1130
1131impl ExecutionPlan for SortExec {
1132 fn name(&self) -> &'static str {
1133 match self.fetch {
1134 Some(_) => "SortExec(TopK)",
1135 None => "SortExec",
1136 }
1137 }
1138
1139 fn as_any(&self) -> &dyn Any {
1140 self
1141 }
1142
1143 fn properties(&self) -> &PlanProperties {
1144 &self.cache
1145 }
1146
1147 fn required_input_distribution(&self) -> Vec<Distribution> {
1148 if self.preserve_partitioning {
1149 vec![Distribution::UnspecifiedDistribution]
1150 } else {
1151 vec![Distribution::SinglePartition]
1154 }
1155 }
1156
1157 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1158 vec![&self.input]
1159 }
1160
1161 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1162 vec![false]
1163 }
1164
1165 fn with_new_children(
1166 self: Arc<Self>,
1167 children: Vec<Arc<dyn ExecutionPlan>>,
1168 ) -> Result<Arc<dyn ExecutionPlan>> {
1169 let mut new_sort = self.cloned();
1170 assert!(
1171 children.len() == 1,
1172 "SortExec should have exactly one child"
1173 );
1174 new_sort.input = Arc::clone(&children[0]);
1175 let (cache, sort_prefix) = Self::compute_properties(
1177 &new_sort.input,
1178 new_sort.expr.clone(),
1179 new_sort.preserve_partitioning,
1180 )?;
1181 new_sort.cache = cache;
1182 new_sort.common_sort_prefix = sort_prefix;
1183
1184 Ok(Arc::new(new_sort))
1185 }
1186
1187 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1188 let children = self.children().into_iter().cloned().collect();
1189 let new_sort = self.with_new_children(children)?;
1190 let mut new_sort = new_sort
1191 .as_any()
1192 .downcast_ref::<SortExec>()
1193 .expect("cloned 1 lines above this line, we know the type")
1194 .clone();
1195 new_sort.filter = Some(new_sort.create_filter());
1197 new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1198
1199 Ok(Arc::new(new_sort))
1200 }
1201
1202 fn execute(
1203 &self,
1204 partition: usize,
1205 context: Arc<TaskContext>,
1206 ) -> Result<SendableRecordBatchStream> {
1207 trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
1208
1209 let mut input = self.input.execute(partition, Arc::clone(&context))?;
1210
1211 let execution_options = &context.session_config().options().execution;
1212
1213 trace!("End SortExec's input.execute for partition: {partition}");
1214
1215 let sort_satisfied = self
1216 .input
1217 .equivalence_properties()
1218 .ordering_satisfy(self.expr.clone())?;
1219
1220 match (sort_satisfied, self.fetch.as_ref()) {
1221 (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1222 input,
1223 0,
1224 Some(*fetch),
1225 BaselineMetrics::new(&self.metrics_set, partition),
1226 ))),
1227 (true, None) => Ok(input),
1228 (false, Some(fetch)) => {
1229 let filter = self.filter.clone();
1230 let mut topk = TopK::try_new(
1231 partition,
1232 input.schema(),
1233 self.common_sort_prefix.clone(),
1234 self.expr.clone(),
1235 *fetch,
1236 context.session_config().batch_size(),
1237 context.runtime_env(),
1238 &self.metrics_set,
1239 Arc::clone(&unwrap_or_internal_err!(filter)),
1240 )?;
1241 Ok(Box::pin(RecordBatchStreamAdapter::new(
1242 self.schema(),
1243 futures::stream::once(async move {
1244 while let Some(batch) = input.next().await {
1245 let batch = batch?;
1246 topk.insert_batch(batch)?;
1247 if topk.finished {
1248 break;
1249 }
1250 }
1251 topk.emit()
1252 })
1253 .try_flatten(),
1254 )))
1255 }
1256 (false, None) => {
1257 let mut sorter = ExternalSorter::new(
1258 partition,
1259 input.schema(),
1260 self.expr.clone(),
1261 context.session_config().batch_size(),
1262 execution_options.sort_spill_reservation_bytes,
1263 execution_options.sort_in_place_threshold_bytes,
1264 context.session_config().spill_compression(),
1265 &self.metrics_set,
1266 context.runtime_env(),
1267 )?;
1268 Ok(Box::pin(RecordBatchStreamAdapter::new(
1269 self.schema(),
1270 futures::stream::once(async move {
1271 while let Some(batch) = input.next().await {
1272 let batch = batch?;
1273 sorter.insert_batch(batch).await?;
1274 }
1275 sorter.sort().await
1276 })
1277 .try_flatten(),
1278 )))
1279 }
1280 }
1281 }
1282
1283 fn metrics(&self) -> Option<MetricsSet> {
1284 Some(self.metrics_set.clone_inner())
1285 }
1286
1287 fn statistics(&self) -> Result<Statistics> {
1288 self.partition_statistics(None)
1289 }
1290
1291 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1292 if !self.preserve_partitioning() {
1293 return self
1294 .input
1295 .partition_statistics(None)?
1296 .with_fetch(self.fetch, 0, 1);
1297 }
1298 self.input
1299 .partition_statistics(partition)?
1300 .with_fetch(self.fetch, 0, 1)
1301 }
1302
1303 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1304 Some(Arc::new(SortExec::with_fetch(self, limit)))
1305 }
1306
1307 fn fetch(&self) -> Option<usize> {
1308 self.fetch
1309 }
1310
1311 fn cardinality_effect(&self) -> CardinalityEffect {
1312 if self.fetch.is_none() {
1313 CardinalityEffect::Equal
1314 } else {
1315 CardinalityEffect::LowerEqual
1316 }
1317 }
1318
1319 fn try_swapping_with_projection(
1323 &self,
1324 projection: &ProjectionExec,
1325 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1326 if projection.expr().len() >= projection.input().schema().fields().len() {
1328 return Ok(None);
1329 }
1330
1331 let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1332 else {
1333 return Ok(None);
1334 };
1335
1336 Ok(Some(Arc::new(
1337 SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1338 .with_fetch(self.fetch())
1339 .with_preserve_partitioning(self.preserve_partitioning()),
1340 )))
1341 }
1342
1343 fn gather_filters_for_pushdown(
1344 &self,
1345 phase: FilterPushdownPhase,
1346 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1347 config: &datafusion_common::config::ConfigOptions,
1348 ) -> Result<FilterDescription> {
1349 if !matches!(phase, FilterPushdownPhase::Post) {
1350 return FilterDescription::from_children(parent_filters, &self.children());
1351 }
1352
1353 let mut child =
1354 ChildFilterDescription::from_child(&parent_filters, self.input())?;
1355
1356 if let Some(filter) = &self.filter {
1357 if config.optimizer.enable_topk_dynamic_filter_pushdown {
1358 child = child.with_self_filter(filter.read().expr());
1359 }
1360 }
1361
1362 Ok(FilterDescription::new().with_child(child))
1363 }
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368 use std::collections::HashMap;
1369 use std::pin::Pin;
1370 use std::task::{Context, Poll};
1371
1372 use super::*;
1373 use crate::coalesce_partitions::CoalescePartitionsExec;
1374 use crate::collect;
1375 use crate::execution_plan::Boundedness;
1376 use crate::expressions::col;
1377 use crate::test;
1378 use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1379 use crate::test::TestMemoryExec;
1380 use crate::test::{assert_is_pending, make_partition};
1381
1382 use arrow::array::*;
1383 use arrow::compute::SortOptions;
1384 use arrow::datatypes::*;
1385 use datafusion_common::cast::as_primitive_array;
1386 use datafusion_common::test_util::batches_to_string;
1387 use datafusion_common::{DataFusionError, Result, ScalarValue};
1388 use datafusion_execution::config::SessionConfig;
1389 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1390 use datafusion_execution::RecordBatchStream;
1391 use datafusion_physical_expr::expressions::{Column, Literal};
1392 use datafusion_physical_expr::EquivalenceProperties;
1393
1394 use futures::{FutureExt, Stream};
1395 use insta::assert_snapshot;
1396
1397 #[derive(Debug, Clone)]
1398 pub struct SortedUnboundedExec {
1399 schema: Schema,
1400 batch_size: u64,
1401 cache: PlanProperties,
1402 }
1403
1404 impl DisplayAs for SortedUnboundedExec {
1405 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1406 match t {
1407 DisplayFormatType::Default
1408 | DisplayFormatType::Verbose
1409 | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1410 }
1411 Ok(())
1412 }
1413 }
1414
1415 impl SortedUnboundedExec {
1416 fn compute_properties(schema: SchemaRef) -> PlanProperties {
1417 let mut eq_properties = EquivalenceProperties::new(schema);
1418 eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1419 Column::new("c1", 0),
1420 ))]);
1421 PlanProperties::new(
1422 eq_properties,
1423 Partitioning::UnknownPartitioning(1),
1424 EmissionType::Final,
1425 Boundedness::Unbounded {
1426 requires_infinite_memory: false,
1427 },
1428 )
1429 }
1430 }
1431
1432 impl ExecutionPlan for SortedUnboundedExec {
1433 fn name(&self) -> &'static str {
1434 Self::static_name()
1435 }
1436
1437 fn as_any(&self) -> &dyn Any {
1438 self
1439 }
1440
1441 fn properties(&self) -> &PlanProperties {
1442 &self.cache
1443 }
1444
1445 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1446 vec![]
1447 }
1448
1449 fn with_new_children(
1450 self: Arc<Self>,
1451 _: Vec<Arc<dyn ExecutionPlan>>,
1452 ) -> Result<Arc<dyn ExecutionPlan>> {
1453 Ok(self)
1454 }
1455
1456 fn execute(
1457 &self,
1458 _partition: usize,
1459 _context: Arc<TaskContext>,
1460 ) -> Result<SendableRecordBatchStream> {
1461 Ok(Box::pin(SortedUnboundedStream {
1462 schema: Arc::new(self.schema.clone()),
1463 batch_size: self.batch_size,
1464 offset: 0,
1465 }))
1466 }
1467 }
1468
1469 #[derive(Debug)]
1470 pub struct SortedUnboundedStream {
1471 schema: SchemaRef,
1472 batch_size: u64,
1473 offset: u64,
1474 }
1475
1476 impl Stream for SortedUnboundedStream {
1477 type Item = Result<RecordBatch>;
1478
1479 fn poll_next(
1480 mut self: Pin<&mut Self>,
1481 _cx: &mut Context<'_>,
1482 ) -> Poll<Option<Self::Item>> {
1483 let batch = SortedUnboundedStream::create_record_batch(
1484 Arc::clone(&self.schema),
1485 self.offset,
1486 self.batch_size,
1487 );
1488 self.offset += self.batch_size;
1489 Poll::Ready(Some(Ok(batch)))
1490 }
1491 }
1492
1493 impl RecordBatchStream for SortedUnboundedStream {
1494 fn schema(&self) -> SchemaRef {
1495 Arc::clone(&self.schema)
1496 }
1497 }
1498
1499 impl SortedUnboundedStream {
1500 fn create_record_batch(
1501 schema: SchemaRef,
1502 offset: u64,
1503 batch_size: u64,
1504 ) -> RecordBatch {
1505 let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1506 let array = UInt64Array::from(values);
1507 let array_ref: ArrayRef = Arc::new(array);
1508 RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1509 }
1510 }
1511
1512 #[tokio::test]
1513 async fn test_in_mem_sort() -> Result<()> {
1514 let task_ctx = Arc::new(TaskContext::default());
1515 let partitions = 4;
1516 let csv = test::scan_partitioned(partitions);
1517 let schema = csv.schema();
1518
1519 let sort_exec = Arc::new(SortExec::new(
1520 [PhysicalSortExpr {
1521 expr: col("i", &schema)?,
1522 options: SortOptions::default(),
1523 }]
1524 .into(),
1525 Arc::new(CoalescePartitionsExec::new(csv)),
1526 ));
1527
1528 let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1529
1530 assert_eq!(result.len(), 1);
1531 assert_eq!(result[0].num_rows(), 400);
1532 assert_eq!(
1533 task_ctx.runtime_env().memory_pool.reserved(),
1534 0,
1535 "The sort should have returned all memory used back to the memory manager"
1536 );
1537
1538 Ok(())
1539 }
1540
1541 #[tokio::test]
1542 async fn test_sort_spill() -> Result<()> {
1543 let session_config = SessionConfig::new();
1545 let sort_spill_reservation_bytes = session_config
1546 .options()
1547 .execution
1548 .sort_spill_reservation_bytes;
1549 let runtime = RuntimeEnvBuilder::new()
1550 .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1551 .build_arc()?;
1552 let task_ctx = Arc::new(
1553 TaskContext::default()
1554 .with_session_config(session_config)
1555 .with_runtime(runtime),
1556 );
1557
1558 let partitions = 100;
1562 let input = test::scan_partitioned(partitions);
1563 let schema = input.schema();
1564
1565 let sort_exec = Arc::new(SortExec::new(
1566 [PhysicalSortExpr {
1567 expr: col("i", &schema)?,
1568 options: SortOptions::default(),
1569 }]
1570 .into(),
1571 Arc::new(CoalescePartitionsExec::new(input)),
1572 ));
1573
1574 let result = collect(
1575 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1576 Arc::clone(&task_ctx),
1577 )
1578 .await?;
1579
1580 assert_eq!(result.len(), 2);
1581
1582 let metrics = sort_exec.metrics().unwrap();
1584
1585 assert_eq!(metrics.output_rows().unwrap(), 10000);
1586 assert!(metrics.elapsed_compute().unwrap() > 0);
1587
1588 let spill_count = metrics.spill_count().unwrap();
1589 let spilled_rows = metrics.spilled_rows().unwrap();
1590 let spilled_bytes = metrics.spilled_bytes().unwrap();
1591 assert!((3..=10).contains(&spill_count));
1595 assert!((9000..=10000).contains(&spilled_rows));
1596 assert!((38000..=44000).contains(&spilled_bytes));
1597
1598 let columns = result[0].columns();
1599
1600 let i = as_primitive_array::<Int32Type>(&columns[0])?;
1601 assert_eq!(i.value(0), 0);
1602 assert_eq!(i.value(i.len() - 1), 81);
1603 assert_eq!(
1604 task_ctx.runtime_env().memory_pool.reserved(),
1605 0,
1606 "The sort should have returned all memory used back to the memory manager"
1607 );
1608
1609 Ok(())
1610 }
1611
1612 #[tokio::test]
1613 async fn test_batch_reservation_error() -> Result<()> {
1614 let expected_batch_reservation = 800;
1617 let merge_reservation: usize = 0; let memory_limit: usize = expected_batch_reservation + merge_reservation - 1; let session_config =
1621 SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1622 let runtime = RuntimeEnvBuilder::new()
1623 .with_memory_limit(memory_limit, 1.0)
1624 .build_arc()?;
1625 let task_ctx = Arc::new(
1626 TaskContext::default()
1627 .with_session_config(session_config)
1628 .with_runtime(runtime),
1629 );
1630
1631 let plan = test::scan_partitioned(1);
1632
1633 {
1636 let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1637 let first_batch = stream.next().await.unwrap()?;
1638 let batch_reservation = get_reserved_byte_for_record_batch(&first_batch);
1639
1640 assert_eq!(batch_reservation, expected_batch_reservation);
1641 assert!(memory_limit < (merge_reservation + batch_reservation));
1642 }
1643
1644 let sort_exec = Arc::new(SortExec::new(
1645 [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1646 plan,
1647 ));
1648
1649 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1650
1651 let err = result.unwrap_err();
1652 assert!(
1653 matches!(err, DataFusionError::Context(..)),
1654 "Assertion failed: expected a Context error, but got: {err:?}"
1655 );
1656
1657 assert!(
1659 matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1660 "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1661 );
1662
1663 Ok(())
1664 }
1665
1666 #[tokio::test]
1667 async fn test_sort_spill_utf8_strings() -> Result<()> {
1668 let session_config = SessionConfig::new()
1669 .with_batch_size(100)
1670 .with_sort_in_place_threshold_bytes(20 * 1024)
1671 .with_sort_spill_reservation_bytes(100 * 1024);
1672 let runtime = RuntimeEnvBuilder::new()
1673 .with_memory_limit(500 * 1024, 1.0)
1674 .build_arc()?;
1675 let task_ctx = Arc::new(
1676 TaskContext::default()
1677 .with_session_config(session_config)
1678 .with_runtime(runtime),
1679 );
1680
1681 let input = test::scan_partitioned_utf8(200);
1685 let schema = input.schema();
1686
1687 let sort_exec = Arc::new(SortExec::new(
1688 [PhysicalSortExpr {
1689 expr: col("i", &schema)?,
1690 options: SortOptions::default(),
1691 }]
1692 .into(),
1693 Arc::new(CoalescePartitionsExec::new(input)),
1694 ));
1695
1696 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1697
1698 let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1699 assert_eq!(num_rows, 20000);
1700
1701 let metrics = sort_exec.metrics().unwrap();
1703
1704 assert_eq!(metrics.output_rows().unwrap(), 20000);
1705 assert!(metrics.elapsed_compute().unwrap() > 0);
1706
1707 let spill_count = metrics.spill_count().unwrap();
1708 let spilled_rows = metrics.spilled_rows().unwrap();
1709 let spilled_bytes = metrics.spilled_bytes().unwrap();
1710
1711 assert!((4..=8).contains(&spill_count));
1725 assert!((15000..=20000).contains(&spilled_rows));
1726 assert!((900000..=1000000).contains(&spilled_bytes));
1727
1728 let concated_result = concat_batches(&schema, &result)?;
1730 let columns = concated_result.columns();
1731 let string_array = as_string_array(&columns[0]);
1732 for i in 0..string_array.len() - 1 {
1733 assert!(string_array.value(i) <= string_array.value(i + 1));
1734 }
1735
1736 assert_eq!(
1737 task_ctx.runtime_env().memory_pool.reserved(),
1738 0,
1739 "The sort should have returned all memory used back to the memory manager"
1740 );
1741
1742 Ok(())
1743 }
1744
1745 #[tokio::test]
1746 async fn test_sort_fetch_memory_calculation() -> Result<()> {
1747 let avg_batch_size = 400;
1749 let partitions = 4;
1750
1751 let test_options = vec![
1753 (None, true),
1756 (Some(1), false),
1759 ];
1760
1761 for (fetch, expect_spillage) in test_options {
1762 let session_config = SessionConfig::new();
1763 let sort_spill_reservation_bytes = session_config
1764 .options()
1765 .execution
1766 .sort_spill_reservation_bytes;
1767
1768 let runtime = RuntimeEnvBuilder::new()
1769 .with_memory_limit(
1770 sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1771 1.0,
1772 )
1773 .build_arc()?;
1774 let task_ctx = Arc::new(
1775 TaskContext::default()
1776 .with_runtime(runtime)
1777 .with_session_config(session_config),
1778 );
1779
1780 let csv = test::scan_partitioned(partitions);
1781 let schema = csv.schema();
1782
1783 let sort_exec = Arc::new(
1784 SortExec::new(
1785 [PhysicalSortExpr {
1786 expr: col("i", &schema)?,
1787 options: SortOptions::default(),
1788 }]
1789 .into(),
1790 Arc::new(CoalescePartitionsExec::new(csv)),
1791 )
1792 .with_fetch(fetch),
1793 );
1794
1795 let result =
1796 collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1797 assert_eq!(result.len(), 1);
1798
1799 let metrics = sort_exec.metrics().unwrap();
1800 let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1801 assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1802 }
1803 Ok(())
1804 }
1805
1806 #[tokio::test]
1807 async fn test_sort_metadata() -> Result<()> {
1808 let task_ctx = Arc::new(TaskContext::default());
1809 let field_metadata: HashMap<String, String> =
1810 vec![("foo".to_string(), "bar".to_string())]
1811 .into_iter()
1812 .collect();
1813 let schema_metadata: HashMap<String, String> =
1814 vec![("baz".to_string(), "barf".to_string())]
1815 .into_iter()
1816 .collect();
1817
1818 let mut field = Field::new("field_name", DataType::UInt64, true);
1819 field.set_metadata(field_metadata.clone());
1820 let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1821 let schema = Arc::new(schema);
1822
1823 let data: ArrayRef =
1824 Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1825
1826 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1827 let input =
1828 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1829
1830 let sort_exec = Arc::new(SortExec::new(
1831 [PhysicalSortExpr {
1832 expr: col("field_name", &schema)?,
1833 options: SortOptions::default(),
1834 }]
1835 .into(),
1836 input,
1837 ));
1838
1839 let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1840
1841 let expected_data: ArrayRef =
1842 Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1843 let expected_batch =
1844 RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
1845
1846 assert_eq!(&vec![expected_batch], &result);
1848
1849 assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1851 assert_eq!(result[0].schema().metadata(), &schema_metadata);
1852
1853 Ok(())
1854 }
1855
1856 #[tokio::test]
1857 async fn test_lex_sort_by_mixed_types() -> Result<()> {
1858 let task_ctx = Arc::new(TaskContext::default());
1859 let schema = Arc::new(Schema::new(vec![
1860 Field::new("a", DataType::Int32, true),
1861 Field::new(
1862 "b",
1863 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1864 true,
1865 ),
1866 ]));
1867
1868 let batch = RecordBatch::try_new(
1870 Arc::clone(&schema),
1871 vec![
1872 Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
1873 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1874 Some(vec![Some(3)]),
1875 Some(vec![Some(1)]),
1876 Some(vec![Some(6), None]),
1877 Some(vec![Some(5)]),
1878 ])),
1879 ],
1880 )?;
1881
1882 let sort_exec = Arc::new(SortExec::new(
1883 [
1884 PhysicalSortExpr {
1885 expr: col("a", &schema)?,
1886 options: SortOptions {
1887 descending: false,
1888 nulls_first: true,
1889 },
1890 },
1891 PhysicalSortExpr {
1892 expr: col("b", &schema)?,
1893 options: SortOptions {
1894 descending: true,
1895 nulls_first: false,
1896 },
1897 },
1898 ]
1899 .into(),
1900 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
1901 ));
1902
1903 assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
1904 assert_eq!(
1905 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1906 *sort_exec.schema().field(1).data_type()
1907 );
1908
1909 let result: Vec<RecordBatch> =
1910 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1911 let metrics = sort_exec.metrics().unwrap();
1912 assert!(metrics.elapsed_compute().unwrap() > 0);
1913 assert_eq!(metrics.output_rows().unwrap(), 4);
1914 assert_eq!(result.len(), 1);
1915
1916 let expected = RecordBatch::try_new(
1917 schema,
1918 vec![
1919 Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
1920 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1921 Some(vec![Some(1)]),
1922 Some(vec![Some(6), None]),
1923 Some(vec![Some(5)]),
1924 Some(vec![Some(3)]),
1925 ])),
1926 ],
1927 )?;
1928
1929 assert_eq!(expected, result[0]);
1930
1931 Ok(())
1932 }
1933
1934 #[tokio::test]
1935 async fn test_lex_sort_by_float() -> Result<()> {
1936 let task_ctx = Arc::new(TaskContext::default());
1937 let schema = Arc::new(Schema::new(vec![
1938 Field::new("a", DataType::Float32, true),
1939 Field::new("b", DataType::Float64, true),
1940 ]));
1941
1942 let batch = RecordBatch::try_new(
1944 Arc::clone(&schema),
1945 vec![
1946 Arc::new(Float32Array::from(vec![
1947 Some(f32::NAN),
1948 None,
1949 None,
1950 Some(f32::NAN),
1951 Some(1.0_f32),
1952 Some(1.0_f32),
1953 Some(2.0_f32),
1954 Some(3.0_f32),
1955 ])),
1956 Arc::new(Float64Array::from(vec![
1957 Some(200.0_f64),
1958 Some(20.0_f64),
1959 Some(10.0_f64),
1960 Some(100.0_f64),
1961 Some(f64::NAN),
1962 None,
1963 None,
1964 Some(f64::NAN),
1965 ])),
1966 ],
1967 )?;
1968
1969 let sort_exec = Arc::new(SortExec::new(
1970 [
1971 PhysicalSortExpr {
1972 expr: col("a", &schema)?,
1973 options: SortOptions {
1974 descending: true,
1975 nulls_first: true,
1976 },
1977 },
1978 PhysicalSortExpr {
1979 expr: col("b", &schema)?,
1980 options: SortOptions {
1981 descending: false,
1982 nulls_first: false,
1983 },
1984 },
1985 ]
1986 .into(),
1987 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
1988 ));
1989
1990 assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
1991 assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
1992
1993 let result: Vec<RecordBatch> =
1994 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1995 let metrics = sort_exec.metrics().unwrap();
1996 assert!(metrics.elapsed_compute().unwrap() > 0);
1997 assert_eq!(metrics.output_rows().unwrap(), 8);
1998 assert_eq!(result.len(), 1);
1999
2000 let columns = result[0].columns();
2001
2002 assert_eq!(DataType::Float32, *columns[0].data_type());
2003 assert_eq!(DataType::Float64, *columns[1].data_type());
2004
2005 let a = as_primitive_array::<Float32Type>(&columns[0])?;
2006 let b = as_primitive_array::<Float64Type>(&columns[1])?;
2007
2008 let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2010 .map(|i| {
2011 let aval = if a.is_valid(i) {
2012 Some(a.value(i).to_string())
2013 } else {
2014 None
2015 };
2016 let bval = if b.is_valid(i) {
2017 Some(b.value(i).to_string())
2018 } else {
2019 None
2020 };
2021 (aval, bval)
2022 })
2023 .collect();
2024
2025 let expected: Vec<(Option<String>, Option<String>)> = vec![
2026 (None, Some("10".to_owned())),
2027 (None, Some("20".to_owned())),
2028 (Some("NaN".to_owned()), Some("100".to_owned())),
2029 (Some("NaN".to_owned()), Some("200".to_owned())),
2030 (Some("3".to_owned()), Some("NaN".to_owned())),
2031 (Some("2".to_owned()), None),
2032 (Some("1".to_owned()), Some("NaN".to_owned())),
2033 (Some("1".to_owned()), None),
2034 ];
2035
2036 assert_eq!(expected, result);
2037
2038 Ok(())
2039 }
2040
2041 #[tokio::test]
2042 async fn test_drop_cancel() -> Result<()> {
2043 let task_ctx = Arc::new(TaskContext::default());
2044 let schema =
2045 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2046
2047 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2048 let refs = blocking_exec.refs();
2049 let sort_exec = Arc::new(SortExec::new(
2050 [PhysicalSortExpr {
2051 expr: col("a", &schema)?,
2052 options: SortOptions::default(),
2053 }]
2054 .into(),
2055 blocking_exec,
2056 ));
2057
2058 let fut = collect(sort_exec, Arc::clone(&task_ctx));
2059 let mut fut = fut.boxed();
2060
2061 assert_is_pending(&mut fut);
2062 drop(fut);
2063 assert_strong_count_converges_to_zero(refs).await;
2064
2065 assert_eq!(
2066 task_ctx.runtime_env().memory_pool.reserved(),
2067 0,
2068 "The sort should have returned all memory used back to the memory manager"
2069 );
2070
2071 Ok(())
2072 }
2073
2074 #[test]
2075 fn test_empty_sort_batch() {
2076 let schema = Arc::new(Schema::empty());
2077 let options = RecordBatchOptions::new().with_row_count(Some(1));
2078 let batch =
2079 RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2080 .unwrap();
2081
2082 let expressions = [PhysicalSortExpr {
2083 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2084 options: SortOptions::default(),
2085 }]
2086 .into();
2087
2088 let result = sort_batch(&batch, &expressions, None).unwrap();
2089 assert_eq!(result.num_rows(), 1);
2090 }
2091
2092 #[tokio::test]
2093 async fn topk_unbounded_source() -> Result<()> {
2094 let task_ctx = Arc::new(TaskContext::default());
2095 let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2096 let source = SortedUnboundedExec {
2097 schema: schema.clone(),
2098 batch_size: 2,
2099 cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
2100 };
2101 let mut plan = SortExec::new(
2102 [PhysicalSortExpr::new_default(Arc::new(Column::new(
2103 "c1", 0,
2104 )))]
2105 .into(),
2106 Arc::new(source),
2107 );
2108 plan = plan.with_fetch(Some(9));
2109
2110 let batches = collect(Arc::new(plan), task_ctx).await?;
2111 assert_snapshot!(batches_to_string(&batches), @r#"
2112 +----+
2113 | c1 |
2114 +----+
2115 | 0 |
2116 | 1 |
2117 | 2 |
2118 | 3 |
2119 | 4 |
2120 | 5 |
2121 | 6 |
2122 | 7 |
2123 | 8 |
2124 +----+
2125 "#);
2126 Ok(())
2127 }
2128
2129 #[tokio::test]
2130 async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2131 let batch_size = 100;
2132
2133 let create_task_ctx = |_: &[RecordBatch]| {
2134 TaskContext::default().with_session_config(
2135 SessionConfig::new()
2136 .with_batch_size(batch_size)
2137 .with_sort_in_place_threshold_bytes(usize::MAX),
2138 )
2139 };
2140
2141 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2143
2144 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2146
2147 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2149
2150 Ok(())
2151 }
2152
2153 #[tokio::test]
2154 async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place(
2155 ) -> Result<()> {
2156 let batch_size = 100;
2157
2158 let create_task_ctx = |_: &[RecordBatch]| {
2159 TaskContext::default().with_session_config(
2160 SessionConfig::new()
2161 .with_batch_size(batch_size)
2162 .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2163 )
2164 };
2165
2166 {
2168 let metrics =
2169 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2170
2171 assert_eq!(
2172 metrics.spill_count(),
2173 Some(0),
2174 "Expected no spills when sorting in place"
2175 );
2176 }
2177
2178 {
2180 let metrics =
2181 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2182
2183 assert_eq!(
2184 metrics.spill_count(),
2185 Some(0),
2186 "Expected no spills when sorting in place"
2187 );
2188 }
2189
2190 {
2192 let metrics =
2193 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2194
2195 assert_eq!(
2196 metrics.spill_count(),
2197 Some(0),
2198 "Expected no spills when sorting in place"
2199 );
2200 }
2201
2202 Ok(())
2203 }
2204
2205 #[tokio::test]
2206 async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch(
2207 ) -> Result<()> {
2208 let batch_size = 100;
2209
2210 let create_task_ctx = |_: &[RecordBatch]| {
2211 TaskContext::default()
2212 .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2213 };
2214
2215 {
2217 let metrics = test_sort_output_batch_size(
2218 1,
2220 batch_size / 4,
2221 create_task_ctx,
2222 )
2223 .await?;
2224
2225 assert_eq!(
2226 metrics.spill_count(),
2227 Some(0),
2228 "Expected no spills when sorting in place"
2229 );
2230 }
2231
2232 {
2234 let metrics = test_sort_output_batch_size(
2235 1,
2237 batch_size + 7,
2238 create_task_ctx,
2239 )
2240 .await?;
2241
2242 assert_eq!(
2243 metrics.spill_count(),
2244 Some(0),
2245 "Expected no spills when sorting in place"
2246 );
2247 }
2248
2249 {
2251 let metrics = test_sort_output_batch_size(
2252 1,
2254 batch_size * 3,
2255 create_task_ctx,
2256 )
2257 .await?;
2258
2259 assert_eq!(
2260 metrics.spill_count(),
2261 Some(0),
2262 "Expected no spills when sorting in place"
2263 );
2264 }
2265
2266 Ok(())
2267 }
2268
2269 #[tokio::test]
2270 async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill(
2271 ) -> Result<()> {
2272 let batch_size = 100;
2273
2274 let create_task_ctx = |generated_batches: &[RecordBatch]| {
2275 let batches_memory = generated_batches
2276 .iter()
2277 .map(|b| b.get_array_memory_size())
2278 .sum::<usize>();
2279
2280 TaskContext::default()
2281 .with_session_config(
2282 SessionConfig::new()
2283 .with_batch_size(batch_size)
2284 .with_sort_in_place_threshold_bytes(1)
2286 .with_sort_spill_reservation_bytes(1),
2287 )
2288 .with_runtime(
2289 RuntimeEnvBuilder::default()
2290 .with_memory_limit(batches_memory, 1.0)
2291 .build_arc()
2292 .unwrap(),
2293 )
2294 };
2295
2296 {
2298 let metrics =
2299 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2300
2301 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2302 }
2303
2304 {
2306 let metrics =
2307 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2308
2309 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2310 }
2311
2312 {
2314 let metrics =
2315 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2316
2317 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2318 }
2319
2320 Ok(())
2321 }
2322
2323 async fn test_sort_output_batch_size(
2324 number_of_batches: usize,
2325 batch_size_to_generate: usize,
2326 create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2327 ) -> Result<MetricsSet> {
2328 let batches = (0..number_of_batches)
2329 .map(|_| make_partition(batch_size_to_generate as i32))
2330 .collect::<Vec<_>>();
2331 let task_ctx = create_task_ctx(batches.as_slice());
2332
2333 let expected_batch_size = task_ctx.session_config().batch_size();
2334
2335 let (mut output_batches, metrics) =
2336 run_sort_on_input(task_ctx, "i", batches).await?;
2337
2338 let last_batch = output_batches.pop().unwrap();
2339
2340 for batch in output_batches {
2341 assert_eq!(batch.num_rows(), expected_batch_size);
2342 }
2343
2344 let mut last_expected_batch_size =
2345 (batch_size_to_generate * number_of_batches) % expected_batch_size;
2346 if last_expected_batch_size == 0 {
2347 last_expected_batch_size = expected_batch_size;
2348 }
2349 assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2350
2351 Ok(metrics)
2352 }
2353
2354 async fn run_sort_on_input(
2355 task_ctx: TaskContext,
2356 order_by_col: &str,
2357 batches: Vec<RecordBatch>,
2358 ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2359 let task_ctx = Arc::new(task_ctx);
2360
2361 let schema = batches[0].schema();
2363 let ordering: LexOrdering = [PhysicalSortExpr {
2364 expr: col(order_by_col, &schema)?,
2365 options: SortOptions {
2366 descending: false,
2367 nulls_first: true,
2368 },
2369 }]
2370 .into();
2371 let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2372 ordering.clone(),
2373 TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2374 ));
2375
2376 let sorted_batches =
2377 collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2378
2379 let metrics = sort_exec.metrics().expect("sort have metrics");
2380
2381 {
2383 let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2384 let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2385
2386 let sorted_batches_concat =
2387 concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2388
2389 assert_eq!(sorted_input_batch, sorted_batches_concat);
2390 }
2391
2392 Ok((sorted_batches, metrics))
2393 }
2394}