1use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
31use crate::expressions::PhysicalSortExpr;
32use crate::filter_pushdown::{
33 ChildFilterDescription, FilterDescription, FilterPushdownPhase,
34};
35use crate::limit::LimitStream;
36use crate::metrics::{
37 BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, SplitMetrics,
38};
39use crate::projection::{make_with_child, update_ordering, ProjectionExec};
40use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
41use crate::spill::get_record_batch_memory_size;
42use crate::spill::in_progress_spill_file::InProgressSpillFile;
43use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
44use crate::stream::BatchSplitStream;
45use crate::stream::RecordBatchStreamAdapter;
46use crate::topk::TopK;
47use crate::topk::TopKDynamicFilters;
48use crate::{
49 DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
50 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
51 Statistics,
52};
53
54use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
55use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
56use arrow::datatypes::SchemaRef;
57use datafusion_common::config::SpillCompression;
58use datafusion_common::{
59 internal_datafusion_err, internal_err, unwrap_or_internal_err, DataFusionError,
60 Result,
61};
62use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
63use datafusion_execution::runtime_env::RuntimeEnv;
64use datafusion_execution::TaskContext;
65use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
66use datafusion_physical_expr::LexOrdering;
67use datafusion_physical_expr::PhysicalExpr;
68
69use futures::{StreamExt, TryStreamExt};
70use log::{debug, trace};
71
72struct ExternalSorterMetrics {
73 baseline: BaselineMetrics,
75
76 spill_metrics: SpillMetrics,
77
78 split_metrics: SplitMetrics,
79}
80
81impl ExternalSorterMetrics {
82 fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
83 Self {
84 baseline: BaselineMetrics::new(metrics, partition),
85 spill_metrics: SpillMetrics::new(metrics, partition),
86 split_metrics: SplitMetrics::new(metrics, partition),
87 }
88 }
89}
90
91struct ExternalSorter {
210 schema: SchemaRef,
216 expr: LexOrdering,
218 batch_size: usize,
220 sort_in_place_threshold_bytes: usize,
224
225 in_mem_batches: Vec<RecordBatch>,
231
232 in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
239 finished_spill_files: Vec<SortedSpillFile>,
244
245 metrics: ExternalSorterMetrics,
251 runtime: Arc<RuntimeEnv>,
253 reservation: MemoryReservation,
255 spill_manager: SpillManager,
256
257 merge_reservation: MemoryReservation,
261 sort_spill_reservation_bytes: usize,
264}
265
266impl ExternalSorter {
267 #[allow(clippy::too_many_arguments)]
270 pub fn new(
271 partition_id: usize,
272 schema: SchemaRef,
273 expr: LexOrdering,
274 batch_size: usize,
275 sort_spill_reservation_bytes: usize,
276 sort_in_place_threshold_bytes: usize,
277 spill_compression: SpillCompression,
279 metrics: &ExecutionPlanMetricsSet,
280 runtime: Arc<RuntimeEnv>,
281 ) -> Result<Self> {
282 let metrics = ExternalSorterMetrics::new(metrics, partition_id);
283 let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
284 .with_can_spill(true)
285 .register(&runtime.memory_pool);
286
287 let merge_reservation =
288 MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
289 .register(&runtime.memory_pool);
290
291 let spill_manager = SpillManager::new(
292 Arc::clone(&runtime),
293 metrics.spill_metrics.clone(),
294 Arc::clone(&schema),
295 )
296 .with_compression_type(spill_compression);
297
298 Ok(Self {
299 schema,
300 in_mem_batches: vec![],
301 in_progress_spill_file: None,
302 finished_spill_files: vec![],
303 expr,
304 metrics,
305 reservation,
306 spill_manager,
307 merge_reservation,
308 runtime,
309 batch_size,
310 sort_spill_reservation_bytes,
311 sort_in_place_threshold_bytes,
312 })
313 }
314
315 async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
319 if input.num_rows() == 0 {
320 return Ok(());
321 }
322
323 self.reserve_memory_for_merge()?;
324 self.reserve_memory_for_batch_and_maybe_spill(&input)
325 .await?;
326
327 self.in_mem_batches.push(input);
328 Ok(())
329 }
330
331 fn spilled_before(&self) -> bool {
332 !self.finished_spill_files.is_empty()
333 }
334
335 async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
345 self.merge_reservation.free();
349
350 if self.spilled_before() {
351 if !self.in_mem_batches.is_empty() {
355 self.sort_and_spill_in_mem_batches().await?;
356 }
357
358 StreamingMergeBuilder::new()
359 .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
360 .with_spill_manager(self.spill_manager.clone())
361 .with_schema(Arc::clone(&self.schema))
362 .with_expressions(&self.expr.clone())
363 .with_metrics(self.metrics.baseline.clone())
364 .with_batch_size(self.batch_size)
365 .with_fetch(None)
366 .with_reservation(self.merge_reservation.new_empty())
367 .build()
368 } else {
369 self.in_mem_sort_stream(self.metrics.baseline.clone())
370 }
371 }
372
373 fn used(&self) -> usize {
375 self.reservation.size()
376 }
377
378 fn spilled_bytes(&self) -> usize {
380 self.metrics.spill_metrics.spilled_bytes.value()
381 }
382
383 fn spilled_rows(&self) -> usize {
385 self.metrics.spill_metrics.spilled_rows.value()
386 }
387
388 fn spill_count(&self) -> usize {
390 self.metrics.spill_metrics.spill_file_count.value()
391 }
392
393 async fn consume_and_spill_append(
396 &mut self,
397 globally_sorted_batches: &mut Vec<RecordBatch>,
398 ) -> Result<()> {
399 if globally_sorted_batches.is_empty() {
400 return Ok(());
401 }
402
403 if self.in_progress_spill_file.is_none() {
405 self.in_progress_spill_file =
406 Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
407 }
408
409 Self::organize_stringview_arrays(globally_sorted_batches)?;
410
411 debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
412
413 let batches_to_spill = std::mem::take(globally_sorted_batches);
414 self.reservation.free();
415
416 let (in_progress_file, max_record_batch_size) =
417 self.in_progress_spill_file.as_mut().ok_or_else(|| {
418 internal_datafusion_err!("In-progress spill file should be initialized")
419 })?;
420
421 for batch in batches_to_spill {
422 in_progress_file.append_batch(&batch)?;
423
424 *max_record_batch_size =
425 (*max_record_batch_size).max(batch.get_sliced_size()?);
426 }
427
428 if !globally_sorted_batches.is_empty() {
429 return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking.");
430 }
431
432 Ok(())
433 }
434
435 async fn spill_finish(&mut self) -> Result<()> {
437 let (mut in_progress_file, max_record_batch_memory) =
438 self.in_progress_spill_file.take().ok_or_else(|| {
439 internal_datafusion_err!("Should be called after `spill_append`")
440 })?;
441 let spill_file = in_progress_file.finish()?;
442
443 if let Some(spill_file) = spill_file {
444 self.finished_spill_files.push(SortedSpillFile {
445 file: spill_file,
446 max_record_batch_memory,
447 });
448 }
449
450 Ok(())
451 }
452
453 fn organize_stringview_arrays(
483 globally_sorted_batches: &mut Vec<RecordBatch>,
484 ) -> Result<()> {
485 let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
486
487 for batch in globally_sorted_batches.drain(..) {
488 let mut new_columns: Vec<Arc<dyn Array>> =
489 Vec::with_capacity(batch.num_columns());
490
491 let mut arr_mutated = false;
492 for array in batch.columns() {
493 if let Some(string_view_array) =
494 array.as_any().downcast_ref::<StringViewArray>()
495 {
496 let new_array = string_view_array.gc();
497 new_columns.push(Arc::new(new_array));
498 arr_mutated = true;
499 } else {
500 new_columns.push(Arc::clone(array));
501 }
502 }
503
504 let organized_batch = if arr_mutated {
505 RecordBatch::try_new(batch.schema(), new_columns)?
506 } else {
507 batch
508 };
509
510 organized_batches.push(organized_batch);
511 }
512
513 *globally_sorted_batches = organized_batches;
514
515 Ok(())
516 }
517
518 async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
521 if self.in_mem_batches.is_empty() {
522 return internal_err!(
523 "in_mem_batches must not be empty when attempting to sort and spill"
524 );
525 }
526
527 self.merge_reservation.free();
532
533 let mut sorted_stream =
534 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
535 if !self.in_mem_batches.is_empty() {
538 return internal_err!(
539 "in_mem_batches should be empty after constructing sorted stream"
540 );
541 }
542 let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
546
547 while let Some(batch) = sorted_stream.next().await {
548 let batch = batch?;
549 let sorted_size = get_reserved_byte_for_record_batch(&batch);
550 if self.reservation.try_grow(sorted_size).is_err() {
551 globally_sorted_batches.push(batch);
555 self.consume_and_spill_append(&mut globally_sorted_batches)
556 .await?; } else {
558 globally_sorted_batches.push(batch);
559 }
560 }
561
562 drop(sorted_stream);
565
566 self.consume_and_spill_append(&mut globally_sorted_batches)
567 .await?;
568 self.spill_finish().await?;
569
570 let buffers_cleared_property =
572 self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
573 if !buffers_cleared_property {
574 return internal_err!(
575 "in_mem_batches and globally_sorted_batches should be cleared before"
576 );
577 }
578
579 self.reserve_memory_for_merge()?;
581
582 Ok(())
583 }
584
585 fn in_mem_sort_stream(
644 &mut self,
645 metrics: BaselineMetrics,
646 ) -> Result<SendableRecordBatchStream> {
647 if self.in_mem_batches.is_empty() {
648 return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
649 &self.schema,
650 ))));
651 }
652
653 let elapsed_compute = metrics.elapsed_compute().clone();
656 let _timer = elapsed_compute.timer();
657
658 if self.in_mem_batches.len() == 1 {
665 let batch = self.in_mem_batches.swap_remove(0);
666 let reservation = self.reservation.take();
667 return self.sort_batch_stream(batch, metrics, reservation, true);
668 }
669
670 if self.reservation.size() < self.sort_in_place_threshold_bytes {
672 let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
674 self.in_mem_batches.clear();
675 self.reservation
676 .try_resize(get_reserved_byte_for_record_batch(&batch))
677 .map_err(Self::err_with_oom_context)?;
678 let reservation = self.reservation.take();
679 return self.sort_batch_stream(batch, metrics, reservation, true);
680 }
681
682 let streams = std::mem::take(&mut self.in_mem_batches)
683 .into_iter()
684 .map(|batch| {
685 let metrics = self.metrics.baseline.intermediate();
686 let reservation = self
687 .reservation
688 .split(get_reserved_byte_for_record_batch(&batch));
689 let input = self.sort_batch_stream(
690 batch,
691 metrics,
692 reservation,
693 false,
696 )?;
697 Ok(spawn_buffered(input, 1))
698 })
699 .collect::<Result<_>>()?;
700
701 StreamingMergeBuilder::new()
702 .with_streams(streams)
703 .with_schema(Arc::clone(&self.schema))
704 .with_expressions(&self.expr.clone())
705 .with_metrics(metrics)
706 .with_batch_size(self.batch_size)
707 .with_fetch(None)
708 .with_reservation(self.merge_reservation.new_empty())
709 .build()
710 }
711
712 fn sort_batch_stream(
721 &self,
722 batch: RecordBatch,
723 metrics: BaselineMetrics,
724 reservation: MemoryReservation,
725 mut split: bool,
726 ) -> Result<SendableRecordBatchStream> {
727 assert_eq!(
728 get_reserved_byte_for_record_batch(&batch),
729 reservation.size()
730 );
731
732 split = split && batch.num_rows() > self.batch_size;
733
734 let schema = batch.schema();
735
736 let expressions = self.expr.clone();
737 let stream = futures::stream::once(async move {
738 let _timer = metrics.elapsed_compute().timer();
739
740 let sorted = sort_batch(&batch, &expressions, None)?;
741
742 metrics.record_output(sorted.num_rows());
743 drop(batch);
744 drop(reservation);
745 Ok(sorted)
746 });
747
748 let mut output: SendableRecordBatchStream =
749 Box::pin(RecordBatchStreamAdapter::new(schema, stream));
750
751 if split {
752 output = Box::pin(BatchSplitStream::new(
753 output,
754 self.batch_size,
755 self.metrics.split_metrics.clone(),
756 ));
757 }
758
759 Ok(output)
760 }
761
762 fn reserve_memory_for_merge(&mut self) -> Result<()> {
766 if self.runtime.disk_manager.tmp_files_enabled() {
768 let size = self.sort_spill_reservation_bytes;
769 if self.merge_reservation.size() != size {
770 self.merge_reservation
771 .try_resize(size)
772 .map_err(Self::err_with_oom_context)?;
773 }
774 }
775
776 Ok(())
777 }
778
779 async fn reserve_memory_for_batch_and_maybe_spill(
782 &mut self,
783 input: &RecordBatch,
784 ) -> Result<()> {
785 let size = get_reserved_byte_for_record_batch(input);
786
787 match self.reservation.try_grow(size) {
788 Ok(_) => Ok(()),
789 Err(e) => {
790 if self.in_mem_batches.is_empty() {
791 return Err(Self::err_with_oom_context(e));
792 }
793
794 self.sort_and_spill_in_mem_batches().await?;
796 self.reservation
797 .try_grow(size)
798 .map_err(Self::err_with_oom_context)
799 }
800 }
801 }
802
803 fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
806 match e {
807 DataFusionError::ResourcesExhausted(_) => e.context(
808 "Not enough memory to continue external sort. \
809 Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
810 ),
811 _ => e,
813 }
814 }
815}
816
817pub(crate) fn get_reserved_byte_for_record_batch_size(record_batch_size: usize) -> usize {
825 record_batch_size * 2
829}
830
831fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
833 get_reserved_byte_for_record_batch_size(get_record_batch_memory_size(batch))
834}
835
836impl Debug for ExternalSorter {
837 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
838 f.debug_struct("ExternalSorter")
839 .field("memory_used", &self.used())
840 .field("spilled_bytes", &self.spilled_bytes())
841 .field("spilled_rows", &self.spilled_rows())
842 .field("spill_count", &self.spill_count())
843 .finish()
844 }
845}
846
847pub fn sort_batch(
848 batch: &RecordBatch,
849 expressions: &LexOrdering,
850 fetch: Option<usize>,
851) -> Result<RecordBatch> {
852 let sort_columns = expressions
853 .iter()
854 .map(|expr| expr.evaluate_to_sort_column(batch))
855 .collect::<Result<Vec<_>>>()?;
856
857 let indices = lexsort_to_indices(&sort_columns, fetch)?;
858 let mut columns = take_arrays(batch.columns(), &indices, None)?;
859
860 columns.iter_mut().for_each(|c| {
865 c.shrink_to_fit();
866 });
867
868 let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
869 Ok(RecordBatch::try_new_with_options(
870 batch.schema(),
871 columns,
872 &options,
873 )?)
874}
875
876#[derive(Debug, Clone)]
881pub struct SortExec {
882 pub(crate) input: Arc<dyn ExecutionPlan>,
884 expr: LexOrdering,
886 metrics_set: ExecutionPlanMetricsSet,
888 preserve_partitioning: bool,
891 fetch: Option<usize>,
893 common_sort_prefix: Vec<PhysicalSortExpr>,
895 cache: PlanProperties,
897 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
901}
902
903impl SortExec {
904 pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
907 let preserve_partitioning = false;
908 let (cache, sort_prefix) =
909 Self::compute_properties(&input, expr.clone(), preserve_partitioning)
910 .unwrap();
911 Self {
912 expr,
913 input,
914 metrics_set: ExecutionPlanMetricsSet::new(),
915 preserve_partitioning,
916 fetch: None,
917 common_sort_prefix: sort_prefix,
918 cache,
919 filter: None,
920 }
921 }
922
923 pub fn preserve_partitioning(&self) -> bool {
925 self.preserve_partitioning
926 }
927
928 pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
936 self.preserve_partitioning = preserve_partitioning;
937 self.cache = self
938 .cache
939 .with_partitioning(Self::output_partitioning_helper(
940 &self.input,
941 self.preserve_partitioning,
942 ));
943 self
944 }
945
946 fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
948 let children = self
949 .expr
950 .iter()
951 .map(|sort_expr| Arc::clone(&sort_expr.expr))
952 .collect::<Vec<_>>();
953 Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
954 DynamicFilterPhysicalExpr::new(children, lit(true)),
955 ))))
956 }
957
958 fn cloned(&self) -> Self {
959 SortExec {
960 input: Arc::clone(&self.input),
961 expr: self.expr.clone(),
962 metrics_set: self.metrics_set.clone(),
963 preserve_partitioning: self.preserve_partitioning,
964 common_sort_prefix: self.common_sort_prefix.clone(),
965 fetch: self.fetch,
966 cache: self.cache.clone(),
967 filter: self.filter.clone(),
968 }
969 }
970
971 pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
979 let mut cache = self.cache.clone();
980 let is_pipeline_friendly = matches!(
984 self.cache.emission_type,
985 EmissionType::Incremental | EmissionType::Both
986 );
987 if fetch.is_some() && is_pipeline_friendly {
988 cache = cache.with_boundedness(Boundedness::Bounded);
989 }
990 let filter = fetch.is_some().then(|| {
991 self.filter.clone().unwrap_or_else(|| self.create_filter())
993 });
994 let mut new_sort = self.cloned();
995 new_sort.fetch = fetch;
996 new_sort.cache = cache;
997 new_sort.filter = filter;
998 new_sort
999 }
1000
1001 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1003 &self.input
1004 }
1005
1006 pub fn expr(&self) -> &LexOrdering {
1008 &self.expr
1009 }
1010
1011 pub fn fetch(&self) -> Option<usize> {
1013 self.fetch
1014 }
1015
1016 fn output_partitioning_helper(
1017 input: &Arc<dyn ExecutionPlan>,
1018 preserve_partitioning: bool,
1019 ) -> Partitioning {
1020 if preserve_partitioning {
1022 input.output_partitioning().clone()
1023 } else {
1024 Partitioning::UnknownPartitioning(1)
1025 }
1026 }
1027
1028 fn compute_properties(
1031 input: &Arc<dyn ExecutionPlan>,
1032 sort_exprs: LexOrdering,
1033 preserve_partitioning: bool,
1034 ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1035 let (sort_prefix, sort_satisfied) = input
1036 .equivalence_properties()
1037 .extract_common_sort_prefix(sort_exprs.clone())?;
1038
1039 let emission_type = if sort_satisfied {
1043 input.pipeline_behavior()
1044 } else {
1045 EmissionType::Final
1046 };
1047
1048 let boundedness = if sort_satisfied {
1054 input.boundedness()
1055 } else {
1056 match input.boundedness() {
1057 Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1058 requires_infinite_memory: true,
1059 },
1060 bounded => bounded,
1061 }
1062 };
1063
1064 let mut eq_properties = input.equivalence_properties().clone();
1067 eq_properties.reorder(sort_exprs)?;
1068
1069 let output_partitioning =
1071 Self::output_partitioning_helper(input, preserve_partitioning);
1072
1073 Ok((
1074 PlanProperties::new(
1075 eq_properties,
1076 output_partitioning,
1077 emission_type,
1078 boundedness,
1079 ),
1080 sort_prefix,
1081 ))
1082 }
1083}
1084
1085impl DisplayAs for SortExec {
1086 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1087 match t {
1088 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1089 let preserve_partitioning = self.preserve_partitioning;
1090 match self.fetch {
1091 Some(fetch) => {
1092 write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
1093 if let Some(filter) = &self.filter {
1094 if let Ok(current) = filter.read().expr().current() {
1095 if !current.eq(&lit(true)) {
1096 write!(f, ", filter=[{current}]")?;
1097 }
1098 }
1099 }
1100 if !self.common_sort_prefix.is_empty() {
1101 write!(f, ", sort_prefix=[")?;
1102 let mut first = true;
1103 for sort_expr in &self.common_sort_prefix {
1104 if first {
1105 first = false;
1106 } else {
1107 write!(f, ", ")?;
1108 }
1109 write!(f, "{sort_expr}")?;
1110 }
1111 write!(f, "]")
1112 } else {
1113 Ok(())
1114 }
1115 }
1116 None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
1117 }
1118 }
1119 DisplayFormatType::TreeRender => match self.fetch {
1120 Some(fetch) => {
1121 writeln!(f, "{}", self.expr)?;
1122 writeln!(f, "limit={fetch}")
1123 }
1124 None => {
1125 writeln!(f, "{}", self.expr)
1126 }
1127 },
1128 }
1129 }
1130}
1131
1132impl ExecutionPlan for SortExec {
1133 fn name(&self) -> &'static str {
1134 match self.fetch {
1135 Some(_) => "SortExec(TopK)",
1136 None => "SortExec",
1137 }
1138 }
1139
1140 fn as_any(&self) -> &dyn Any {
1141 self
1142 }
1143
1144 fn properties(&self) -> &PlanProperties {
1145 &self.cache
1146 }
1147
1148 fn required_input_distribution(&self) -> Vec<Distribution> {
1149 if self.preserve_partitioning {
1150 vec![Distribution::UnspecifiedDistribution]
1151 } else {
1152 vec![Distribution::SinglePartition]
1155 }
1156 }
1157
1158 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1159 vec![&self.input]
1160 }
1161
1162 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1163 vec![false]
1164 }
1165
1166 fn with_new_children(
1167 self: Arc<Self>,
1168 children: Vec<Arc<dyn ExecutionPlan>>,
1169 ) -> Result<Arc<dyn ExecutionPlan>> {
1170 let mut new_sort = self.cloned();
1171 assert!(
1172 children.len() == 1,
1173 "SortExec should have exactly one child"
1174 );
1175 new_sort.input = Arc::clone(&children[0]);
1176 let (cache, sort_prefix) = Self::compute_properties(
1178 &new_sort.input,
1179 new_sort.expr.clone(),
1180 new_sort.preserve_partitioning,
1181 )?;
1182 new_sort.cache = cache;
1183 new_sort.common_sort_prefix = sort_prefix;
1184
1185 Ok(Arc::new(new_sort))
1186 }
1187
1188 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1189 let children = self.children().into_iter().cloned().collect();
1190 let new_sort = self.with_new_children(children)?;
1191 let mut new_sort = new_sort
1192 .as_any()
1193 .downcast_ref::<SortExec>()
1194 .expect("cloned 1 lines above this line, we know the type")
1195 .clone();
1196 new_sort.filter = Some(new_sort.create_filter());
1198 new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1199
1200 Ok(Arc::new(new_sort))
1201 }
1202
1203 fn execute(
1204 &self,
1205 partition: usize,
1206 context: Arc<TaskContext>,
1207 ) -> Result<SendableRecordBatchStream> {
1208 trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
1209
1210 let mut input = self.input.execute(partition, Arc::clone(&context))?;
1211
1212 let execution_options = &context.session_config().options().execution;
1213
1214 trace!("End SortExec's input.execute for partition: {partition}");
1215
1216 let sort_satisfied = self
1217 .input
1218 .equivalence_properties()
1219 .ordering_satisfy(self.expr.clone())?;
1220
1221 match (sort_satisfied, self.fetch.as_ref()) {
1222 (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1223 input,
1224 0,
1225 Some(*fetch),
1226 BaselineMetrics::new(&self.metrics_set, partition),
1227 ))),
1228 (true, None) => Ok(input),
1229 (false, Some(fetch)) => {
1230 let filter = self.filter.clone();
1231 let mut topk = TopK::try_new(
1232 partition,
1233 input.schema(),
1234 self.common_sort_prefix.clone(),
1235 self.expr.clone(),
1236 *fetch,
1237 context.session_config().batch_size(),
1238 context.runtime_env(),
1239 &self.metrics_set,
1240 Arc::clone(&unwrap_or_internal_err!(filter)),
1241 )?;
1242 Ok(Box::pin(RecordBatchStreamAdapter::new(
1243 self.schema(),
1244 futures::stream::once(async move {
1245 while let Some(batch) = input.next().await {
1246 let batch = batch?;
1247 topk.insert_batch(batch)?;
1248 if topk.finished {
1249 break;
1250 }
1251 }
1252 topk.emit()
1253 })
1254 .try_flatten(),
1255 )))
1256 }
1257 (false, None) => {
1258 let mut sorter = ExternalSorter::new(
1259 partition,
1260 input.schema(),
1261 self.expr.clone(),
1262 context.session_config().batch_size(),
1263 execution_options.sort_spill_reservation_bytes,
1264 execution_options.sort_in_place_threshold_bytes,
1265 context.session_config().spill_compression(),
1266 &self.metrics_set,
1267 context.runtime_env(),
1268 )?;
1269 Ok(Box::pin(RecordBatchStreamAdapter::new(
1270 self.schema(),
1271 futures::stream::once(async move {
1272 while let Some(batch) = input.next().await {
1273 let batch = batch?;
1274 sorter.insert_batch(batch).await?;
1275 }
1276 sorter.sort().await
1277 })
1278 .try_flatten(),
1279 )))
1280 }
1281 }
1282 }
1283
1284 fn metrics(&self) -> Option<MetricsSet> {
1285 Some(self.metrics_set.clone_inner())
1286 }
1287
1288 fn statistics(&self) -> Result<Statistics> {
1289 self.partition_statistics(None)
1290 }
1291
1292 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1293 if !self.preserve_partitioning() {
1294 return self.input.partition_statistics(None)?.with_fetch(
1295 self.schema(),
1296 self.fetch,
1297 0,
1298 1,
1299 );
1300 }
1301 self.input.partition_statistics(partition)?.with_fetch(
1302 self.schema(),
1303 self.fetch,
1304 0,
1305 1,
1306 )
1307 }
1308
1309 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1310 Some(Arc::new(SortExec::with_fetch(self, limit)))
1311 }
1312
1313 fn fetch(&self) -> Option<usize> {
1314 self.fetch
1315 }
1316
1317 fn cardinality_effect(&self) -> CardinalityEffect {
1318 if self.fetch.is_none() {
1319 CardinalityEffect::Equal
1320 } else {
1321 CardinalityEffect::LowerEqual
1322 }
1323 }
1324
1325 fn try_swapping_with_projection(
1329 &self,
1330 projection: &ProjectionExec,
1331 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1332 if projection.expr().len() >= projection.input().schema().fields().len() {
1334 return Ok(None);
1335 }
1336
1337 let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1338 else {
1339 return Ok(None);
1340 };
1341
1342 Ok(Some(Arc::new(
1343 SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1344 .with_fetch(self.fetch())
1345 .with_preserve_partitioning(self.preserve_partitioning()),
1346 )))
1347 }
1348
1349 fn gather_filters_for_pushdown(
1350 &self,
1351 phase: FilterPushdownPhase,
1352 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1353 config: &datafusion_common::config::ConfigOptions,
1354 ) -> Result<FilterDescription> {
1355 if !matches!(phase, FilterPushdownPhase::Post) {
1356 return FilterDescription::from_children(parent_filters, &self.children());
1357 }
1358
1359 let mut child =
1360 ChildFilterDescription::from_child(&parent_filters, self.input())?;
1361
1362 if let Some(filter) = &self.filter {
1363 if config.optimizer.enable_dynamic_filter_pushdown {
1364 child = child.with_self_filter(filter.read().expr());
1365 }
1366 }
1367
1368 Ok(FilterDescription::new().with_child(child))
1369 }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374 use std::collections::HashMap;
1375 use std::pin::Pin;
1376 use std::task::{Context, Poll};
1377
1378 use super::*;
1379 use crate::coalesce_partitions::CoalescePartitionsExec;
1380 use crate::collect;
1381 use crate::execution_plan::Boundedness;
1382 use crate::expressions::col;
1383 use crate::test;
1384 use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1385 use crate::test::TestMemoryExec;
1386 use crate::test::{assert_is_pending, make_partition};
1387
1388 use arrow::array::*;
1389 use arrow::compute::SortOptions;
1390 use arrow::datatypes::*;
1391 use datafusion_common::cast::as_primitive_array;
1392 use datafusion_common::test_util::batches_to_string;
1393 use datafusion_common::{DataFusionError, Result, ScalarValue};
1394 use datafusion_execution::config::SessionConfig;
1395 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1396 use datafusion_execution::RecordBatchStream;
1397 use datafusion_physical_expr::expressions::{Column, Literal};
1398 use datafusion_physical_expr::EquivalenceProperties;
1399
1400 use futures::{FutureExt, Stream};
1401 use insta::assert_snapshot;
1402
1403 #[derive(Debug, Clone)]
1404 pub struct SortedUnboundedExec {
1405 schema: Schema,
1406 batch_size: u64,
1407 cache: PlanProperties,
1408 }
1409
1410 impl DisplayAs for SortedUnboundedExec {
1411 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1412 match t {
1413 DisplayFormatType::Default
1414 | DisplayFormatType::Verbose
1415 | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1416 }
1417 Ok(())
1418 }
1419 }
1420
1421 impl SortedUnboundedExec {
1422 fn compute_properties(schema: SchemaRef) -> PlanProperties {
1423 let mut eq_properties = EquivalenceProperties::new(schema);
1424 eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1425 Column::new("c1", 0),
1426 ))]);
1427 PlanProperties::new(
1428 eq_properties,
1429 Partitioning::UnknownPartitioning(1),
1430 EmissionType::Final,
1431 Boundedness::Unbounded {
1432 requires_infinite_memory: false,
1433 },
1434 )
1435 }
1436 }
1437
1438 impl ExecutionPlan for SortedUnboundedExec {
1439 fn name(&self) -> &'static str {
1440 Self::static_name()
1441 }
1442
1443 fn as_any(&self) -> &dyn Any {
1444 self
1445 }
1446
1447 fn properties(&self) -> &PlanProperties {
1448 &self.cache
1449 }
1450
1451 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1452 vec![]
1453 }
1454
1455 fn with_new_children(
1456 self: Arc<Self>,
1457 _: Vec<Arc<dyn ExecutionPlan>>,
1458 ) -> Result<Arc<dyn ExecutionPlan>> {
1459 Ok(self)
1460 }
1461
1462 fn execute(
1463 &self,
1464 _partition: usize,
1465 _context: Arc<TaskContext>,
1466 ) -> Result<SendableRecordBatchStream> {
1467 Ok(Box::pin(SortedUnboundedStream {
1468 schema: Arc::new(self.schema.clone()),
1469 batch_size: self.batch_size,
1470 offset: 0,
1471 }))
1472 }
1473 }
1474
1475 #[derive(Debug)]
1476 pub struct SortedUnboundedStream {
1477 schema: SchemaRef,
1478 batch_size: u64,
1479 offset: u64,
1480 }
1481
1482 impl Stream for SortedUnboundedStream {
1483 type Item = Result<RecordBatch>;
1484
1485 fn poll_next(
1486 mut self: Pin<&mut Self>,
1487 _cx: &mut Context<'_>,
1488 ) -> Poll<Option<Self::Item>> {
1489 let batch = SortedUnboundedStream::create_record_batch(
1490 Arc::clone(&self.schema),
1491 self.offset,
1492 self.batch_size,
1493 );
1494 self.offset += self.batch_size;
1495 Poll::Ready(Some(Ok(batch)))
1496 }
1497 }
1498
1499 impl RecordBatchStream for SortedUnboundedStream {
1500 fn schema(&self) -> SchemaRef {
1501 Arc::clone(&self.schema)
1502 }
1503 }
1504
1505 impl SortedUnboundedStream {
1506 fn create_record_batch(
1507 schema: SchemaRef,
1508 offset: u64,
1509 batch_size: u64,
1510 ) -> RecordBatch {
1511 let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1512 let array = UInt64Array::from(values);
1513 let array_ref: ArrayRef = Arc::new(array);
1514 RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1515 }
1516 }
1517
1518 #[tokio::test]
1519 async fn test_in_mem_sort() -> Result<()> {
1520 let task_ctx = Arc::new(TaskContext::default());
1521 let partitions = 4;
1522 let csv = test::scan_partitioned(partitions);
1523 let schema = csv.schema();
1524
1525 let sort_exec = Arc::new(SortExec::new(
1526 [PhysicalSortExpr {
1527 expr: col("i", &schema)?,
1528 options: SortOptions::default(),
1529 }]
1530 .into(),
1531 Arc::new(CoalescePartitionsExec::new(csv)),
1532 ));
1533
1534 let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1535
1536 assert_eq!(result.len(), 1);
1537 assert_eq!(result[0].num_rows(), 400);
1538 assert_eq!(
1539 task_ctx.runtime_env().memory_pool.reserved(),
1540 0,
1541 "The sort should have returned all memory used back to the memory manager"
1542 );
1543
1544 Ok(())
1545 }
1546
1547 #[tokio::test]
1548 async fn test_sort_spill() -> Result<()> {
1549 let session_config = SessionConfig::new();
1551 let sort_spill_reservation_bytes = session_config
1552 .options()
1553 .execution
1554 .sort_spill_reservation_bytes;
1555 let runtime = RuntimeEnvBuilder::new()
1556 .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1557 .build_arc()?;
1558 let task_ctx = Arc::new(
1559 TaskContext::default()
1560 .with_session_config(session_config)
1561 .with_runtime(runtime),
1562 );
1563
1564 let partitions = 100;
1568 let input = test::scan_partitioned(partitions);
1569 let schema = input.schema();
1570
1571 let sort_exec = Arc::new(SortExec::new(
1572 [PhysicalSortExpr {
1573 expr: col("i", &schema)?,
1574 options: SortOptions::default(),
1575 }]
1576 .into(),
1577 Arc::new(CoalescePartitionsExec::new(input)),
1578 ));
1579
1580 let result = collect(
1581 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1582 Arc::clone(&task_ctx),
1583 )
1584 .await?;
1585
1586 assert_eq!(result.len(), 2);
1587
1588 let metrics = sort_exec.metrics().unwrap();
1590
1591 assert_eq!(metrics.output_rows().unwrap(), 10000);
1592 assert!(metrics.elapsed_compute().unwrap() > 0);
1593
1594 let spill_count = metrics.spill_count().unwrap();
1595 let spilled_rows = metrics.spilled_rows().unwrap();
1596 let spilled_bytes = metrics.spilled_bytes().unwrap();
1597 assert!((3..=10).contains(&spill_count));
1601 assert!((9000..=10000).contains(&spilled_rows));
1602 assert!((38000..=44000).contains(&spilled_bytes));
1603
1604 let columns = result[0].columns();
1605
1606 let i = as_primitive_array::<Int32Type>(&columns[0])?;
1607 assert_eq!(i.value(0), 0);
1608 assert_eq!(i.value(i.len() - 1), 81);
1609 assert_eq!(
1610 task_ctx.runtime_env().memory_pool.reserved(),
1611 0,
1612 "The sort should have returned all memory used back to the memory manager"
1613 );
1614
1615 Ok(())
1616 }
1617
1618 #[tokio::test]
1619 async fn test_batch_reservation_error() -> Result<()> {
1620 let expected_batch_reservation = 800;
1623 let merge_reservation: usize = 0; let memory_limit: usize = expected_batch_reservation + merge_reservation - 1; let session_config =
1627 SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1628 let runtime = RuntimeEnvBuilder::new()
1629 .with_memory_limit(memory_limit, 1.0)
1630 .build_arc()?;
1631 let task_ctx = Arc::new(
1632 TaskContext::default()
1633 .with_session_config(session_config)
1634 .with_runtime(runtime),
1635 );
1636
1637 let plan = test::scan_partitioned(1);
1638
1639 {
1642 let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1643 let first_batch = stream.next().await.unwrap()?;
1644 let batch_reservation = get_reserved_byte_for_record_batch(&first_batch);
1645
1646 assert_eq!(batch_reservation, expected_batch_reservation);
1647 assert!(memory_limit < (merge_reservation + batch_reservation));
1648 }
1649
1650 let sort_exec = Arc::new(SortExec::new(
1651 [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1652 plan,
1653 ));
1654
1655 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1656
1657 let err = result.unwrap_err();
1658 assert!(
1659 matches!(err, DataFusionError::Context(..)),
1660 "Assertion failed: expected a Context error, but got: {err:?}"
1661 );
1662
1663 assert!(
1665 matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1666 "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1667 );
1668
1669 Ok(())
1670 }
1671
1672 #[tokio::test]
1673 async fn test_sort_spill_utf8_strings() -> Result<()> {
1674 let session_config = SessionConfig::new()
1675 .with_batch_size(100)
1676 .with_sort_in_place_threshold_bytes(20 * 1024)
1677 .with_sort_spill_reservation_bytes(100 * 1024);
1678 let runtime = RuntimeEnvBuilder::new()
1679 .with_memory_limit(500 * 1024, 1.0)
1680 .build_arc()?;
1681 let task_ctx = Arc::new(
1682 TaskContext::default()
1683 .with_session_config(session_config)
1684 .with_runtime(runtime),
1685 );
1686
1687 let input = test::scan_partitioned_utf8(200);
1691 let schema = input.schema();
1692
1693 let sort_exec = Arc::new(SortExec::new(
1694 [PhysicalSortExpr {
1695 expr: col("i", &schema)?,
1696 options: SortOptions::default(),
1697 }]
1698 .into(),
1699 Arc::new(CoalescePartitionsExec::new(input)),
1700 ));
1701
1702 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1703
1704 let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1705 assert_eq!(num_rows, 20000);
1706
1707 let metrics = sort_exec.metrics().unwrap();
1709
1710 assert_eq!(metrics.output_rows().unwrap(), 20000);
1711 assert!(metrics.elapsed_compute().unwrap() > 0);
1712
1713 let spill_count = metrics.spill_count().unwrap();
1714 let spilled_rows = metrics.spilled_rows().unwrap();
1715 let spilled_bytes = metrics.spilled_bytes().unwrap();
1716
1717 assert!((4..=8).contains(&spill_count));
1731 assert!((15000..=20000).contains(&spilled_rows));
1732 assert!((900000..=1000000).contains(&spilled_bytes));
1733
1734 let concated_result = concat_batches(&schema, &result)?;
1736 let columns = concated_result.columns();
1737 let string_array = as_string_array(&columns[0]);
1738 for i in 0..string_array.len() - 1 {
1739 assert!(string_array.value(i) <= string_array.value(i + 1));
1740 }
1741
1742 assert_eq!(
1743 task_ctx.runtime_env().memory_pool.reserved(),
1744 0,
1745 "The sort should have returned all memory used back to the memory manager"
1746 );
1747
1748 Ok(())
1749 }
1750
1751 #[tokio::test]
1752 async fn test_sort_fetch_memory_calculation() -> Result<()> {
1753 let avg_batch_size = 400;
1755 let partitions = 4;
1756
1757 let test_options = vec![
1759 (None, true),
1762 (Some(1), false),
1765 ];
1766
1767 for (fetch, expect_spillage) in test_options {
1768 let session_config = SessionConfig::new();
1769 let sort_spill_reservation_bytes = session_config
1770 .options()
1771 .execution
1772 .sort_spill_reservation_bytes;
1773
1774 let runtime = RuntimeEnvBuilder::new()
1775 .with_memory_limit(
1776 sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1777 1.0,
1778 )
1779 .build_arc()?;
1780 let task_ctx = Arc::new(
1781 TaskContext::default()
1782 .with_runtime(runtime)
1783 .with_session_config(session_config),
1784 );
1785
1786 let csv = test::scan_partitioned(partitions);
1787 let schema = csv.schema();
1788
1789 let sort_exec = Arc::new(
1790 SortExec::new(
1791 [PhysicalSortExpr {
1792 expr: col("i", &schema)?,
1793 options: SortOptions::default(),
1794 }]
1795 .into(),
1796 Arc::new(CoalescePartitionsExec::new(csv)),
1797 )
1798 .with_fetch(fetch),
1799 );
1800
1801 let result =
1802 collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1803 assert_eq!(result.len(), 1);
1804
1805 let metrics = sort_exec.metrics().unwrap();
1806 let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1807 assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1808 }
1809 Ok(())
1810 }
1811
1812 #[tokio::test]
1813 async fn test_sort_metadata() -> Result<()> {
1814 let task_ctx = Arc::new(TaskContext::default());
1815 let field_metadata: HashMap<String, String> =
1816 vec![("foo".to_string(), "bar".to_string())]
1817 .into_iter()
1818 .collect();
1819 let schema_metadata: HashMap<String, String> =
1820 vec![("baz".to_string(), "barf".to_string())]
1821 .into_iter()
1822 .collect();
1823
1824 let mut field = Field::new("field_name", DataType::UInt64, true);
1825 field.set_metadata(field_metadata.clone());
1826 let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1827 let schema = Arc::new(schema);
1828
1829 let data: ArrayRef =
1830 Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1831
1832 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1833 let input =
1834 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1835
1836 let sort_exec = Arc::new(SortExec::new(
1837 [PhysicalSortExpr {
1838 expr: col("field_name", &schema)?,
1839 options: SortOptions::default(),
1840 }]
1841 .into(),
1842 input,
1843 ));
1844
1845 let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1846
1847 let expected_data: ArrayRef =
1848 Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1849 let expected_batch =
1850 RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
1851
1852 assert_eq!(&vec![expected_batch], &result);
1854
1855 assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1857 assert_eq!(result[0].schema().metadata(), &schema_metadata);
1858
1859 Ok(())
1860 }
1861
1862 #[tokio::test]
1863 async fn test_lex_sort_by_mixed_types() -> Result<()> {
1864 let task_ctx = Arc::new(TaskContext::default());
1865 let schema = Arc::new(Schema::new(vec![
1866 Field::new("a", DataType::Int32, true),
1867 Field::new(
1868 "b",
1869 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1870 true,
1871 ),
1872 ]));
1873
1874 let batch = RecordBatch::try_new(
1876 Arc::clone(&schema),
1877 vec![
1878 Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
1879 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1880 Some(vec![Some(3)]),
1881 Some(vec![Some(1)]),
1882 Some(vec![Some(6), None]),
1883 Some(vec![Some(5)]),
1884 ])),
1885 ],
1886 )?;
1887
1888 let sort_exec = Arc::new(SortExec::new(
1889 [
1890 PhysicalSortExpr {
1891 expr: col("a", &schema)?,
1892 options: SortOptions {
1893 descending: false,
1894 nulls_first: true,
1895 },
1896 },
1897 PhysicalSortExpr {
1898 expr: col("b", &schema)?,
1899 options: SortOptions {
1900 descending: true,
1901 nulls_first: false,
1902 },
1903 },
1904 ]
1905 .into(),
1906 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
1907 ));
1908
1909 assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
1910 assert_eq!(
1911 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1912 *sort_exec.schema().field(1).data_type()
1913 );
1914
1915 let result: Vec<RecordBatch> =
1916 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1917 let metrics = sort_exec.metrics().unwrap();
1918 assert!(metrics.elapsed_compute().unwrap() > 0);
1919 assert_eq!(metrics.output_rows().unwrap(), 4);
1920 assert_eq!(result.len(), 1);
1921
1922 let expected = RecordBatch::try_new(
1923 schema,
1924 vec![
1925 Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
1926 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1927 Some(vec![Some(1)]),
1928 Some(vec![Some(6), None]),
1929 Some(vec![Some(5)]),
1930 Some(vec![Some(3)]),
1931 ])),
1932 ],
1933 )?;
1934
1935 assert_eq!(expected, result[0]);
1936
1937 Ok(())
1938 }
1939
1940 #[tokio::test]
1941 async fn test_lex_sort_by_float() -> Result<()> {
1942 let task_ctx = Arc::new(TaskContext::default());
1943 let schema = Arc::new(Schema::new(vec![
1944 Field::new("a", DataType::Float32, true),
1945 Field::new("b", DataType::Float64, true),
1946 ]));
1947
1948 let batch = RecordBatch::try_new(
1950 Arc::clone(&schema),
1951 vec![
1952 Arc::new(Float32Array::from(vec![
1953 Some(f32::NAN),
1954 None,
1955 None,
1956 Some(f32::NAN),
1957 Some(1.0_f32),
1958 Some(1.0_f32),
1959 Some(2.0_f32),
1960 Some(3.0_f32),
1961 ])),
1962 Arc::new(Float64Array::from(vec![
1963 Some(200.0_f64),
1964 Some(20.0_f64),
1965 Some(10.0_f64),
1966 Some(100.0_f64),
1967 Some(f64::NAN),
1968 None,
1969 None,
1970 Some(f64::NAN),
1971 ])),
1972 ],
1973 )?;
1974
1975 let sort_exec = Arc::new(SortExec::new(
1976 [
1977 PhysicalSortExpr {
1978 expr: col("a", &schema)?,
1979 options: SortOptions {
1980 descending: true,
1981 nulls_first: true,
1982 },
1983 },
1984 PhysicalSortExpr {
1985 expr: col("b", &schema)?,
1986 options: SortOptions {
1987 descending: false,
1988 nulls_first: false,
1989 },
1990 },
1991 ]
1992 .into(),
1993 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
1994 ));
1995
1996 assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
1997 assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
1998
1999 let result: Vec<RecordBatch> =
2000 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2001 let metrics = sort_exec.metrics().unwrap();
2002 assert!(metrics.elapsed_compute().unwrap() > 0);
2003 assert_eq!(metrics.output_rows().unwrap(), 8);
2004 assert_eq!(result.len(), 1);
2005
2006 let columns = result[0].columns();
2007
2008 assert_eq!(DataType::Float32, *columns[0].data_type());
2009 assert_eq!(DataType::Float64, *columns[1].data_type());
2010
2011 let a = as_primitive_array::<Float32Type>(&columns[0])?;
2012 let b = as_primitive_array::<Float64Type>(&columns[1])?;
2013
2014 let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2016 .map(|i| {
2017 let aval = if a.is_valid(i) {
2018 Some(a.value(i).to_string())
2019 } else {
2020 None
2021 };
2022 let bval = if b.is_valid(i) {
2023 Some(b.value(i).to_string())
2024 } else {
2025 None
2026 };
2027 (aval, bval)
2028 })
2029 .collect();
2030
2031 let expected: Vec<(Option<String>, Option<String>)> = vec![
2032 (None, Some("10".to_owned())),
2033 (None, Some("20".to_owned())),
2034 (Some("NaN".to_owned()), Some("100".to_owned())),
2035 (Some("NaN".to_owned()), Some("200".to_owned())),
2036 (Some("3".to_owned()), Some("NaN".to_owned())),
2037 (Some("2".to_owned()), None),
2038 (Some("1".to_owned()), Some("NaN".to_owned())),
2039 (Some("1".to_owned()), None),
2040 ];
2041
2042 assert_eq!(expected, result);
2043
2044 Ok(())
2045 }
2046
2047 #[tokio::test]
2048 async fn test_drop_cancel() -> Result<()> {
2049 let task_ctx = Arc::new(TaskContext::default());
2050 let schema =
2051 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2052
2053 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2054 let refs = blocking_exec.refs();
2055 let sort_exec = Arc::new(SortExec::new(
2056 [PhysicalSortExpr {
2057 expr: col("a", &schema)?,
2058 options: SortOptions::default(),
2059 }]
2060 .into(),
2061 blocking_exec,
2062 ));
2063
2064 let fut = collect(sort_exec, Arc::clone(&task_ctx));
2065 let mut fut = fut.boxed();
2066
2067 assert_is_pending(&mut fut);
2068 drop(fut);
2069 assert_strong_count_converges_to_zero(refs).await;
2070
2071 assert_eq!(
2072 task_ctx.runtime_env().memory_pool.reserved(),
2073 0,
2074 "The sort should have returned all memory used back to the memory manager"
2075 );
2076
2077 Ok(())
2078 }
2079
2080 #[test]
2081 fn test_empty_sort_batch() {
2082 let schema = Arc::new(Schema::empty());
2083 let options = RecordBatchOptions::new().with_row_count(Some(1));
2084 let batch =
2085 RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2086 .unwrap();
2087
2088 let expressions = [PhysicalSortExpr {
2089 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2090 options: SortOptions::default(),
2091 }]
2092 .into();
2093
2094 let result = sort_batch(&batch, &expressions, None).unwrap();
2095 assert_eq!(result.num_rows(), 1);
2096 }
2097
2098 #[tokio::test]
2099 async fn topk_unbounded_source() -> Result<()> {
2100 let task_ctx = Arc::new(TaskContext::default());
2101 let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2102 let source = SortedUnboundedExec {
2103 schema: schema.clone(),
2104 batch_size: 2,
2105 cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
2106 };
2107 let mut plan = SortExec::new(
2108 [PhysicalSortExpr::new_default(Arc::new(Column::new(
2109 "c1", 0,
2110 )))]
2111 .into(),
2112 Arc::new(source),
2113 );
2114 plan = plan.with_fetch(Some(9));
2115
2116 let batches = collect(Arc::new(plan), task_ctx).await?;
2117 assert_snapshot!(batches_to_string(&batches), @r#"
2118 +----+
2119 | c1 |
2120 +----+
2121 | 0 |
2122 | 1 |
2123 | 2 |
2124 | 3 |
2125 | 4 |
2126 | 5 |
2127 | 6 |
2128 | 7 |
2129 | 8 |
2130 +----+
2131 "#);
2132 Ok(())
2133 }
2134
2135 #[tokio::test]
2136 async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2137 let batch_size = 100;
2138
2139 let create_task_ctx = |_: &[RecordBatch]| {
2140 TaskContext::default().with_session_config(
2141 SessionConfig::new()
2142 .with_batch_size(batch_size)
2143 .with_sort_in_place_threshold_bytes(usize::MAX),
2144 )
2145 };
2146
2147 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2149
2150 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2152
2153 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2155
2156 Ok(())
2157 }
2158
2159 #[tokio::test]
2160 async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place(
2161 ) -> Result<()> {
2162 let batch_size = 100;
2163
2164 let create_task_ctx = |_: &[RecordBatch]| {
2165 TaskContext::default().with_session_config(
2166 SessionConfig::new()
2167 .with_batch_size(batch_size)
2168 .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2169 )
2170 };
2171
2172 {
2174 let metrics =
2175 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2176
2177 assert_eq!(
2178 metrics.spill_count(),
2179 Some(0),
2180 "Expected no spills when sorting in place"
2181 );
2182 }
2183
2184 {
2186 let metrics =
2187 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2188
2189 assert_eq!(
2190 metrics.spill_count(),
2191 Some(0),
2192 "Expected no spills when sorting in place"
2193 );
2194 }
2195
2196 {
2198 let metrics =
2199 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2200
2201 assert_eq!(
2202 metrics.spill_count(),
2203 Some(0),
2204 "Expected no spills when sorting in place"
2205 );
2206 }
2207
2208 Ok(())
2209 }
2210
2211 #[tokio::test]
2212 async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch(
2213 ) -> Result<()> {
2214 let batch_size = 100;
2215
2216 let create_task_ctx = |_: &[RecordBatch]| {
2217 TaskContext::default()
2218 .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2219 };
2220
2221 {
2223 let metrics = test_sort_output_batch_size(
2224 1,
2226 batch_size / 4,
2227 create_task_ctx,
2228 )
2229 .await?;
2230
2231 assert_eq!(
2232 metrics.spill_count(),
2233 Some(0),
2234 "Expected no spills when sorting in place"
2235 );
2236 }
2237
2238 {
2240 let metrics = test_sort_output_batch_size(
2241 1,
2243 batch_size + 7,
2244 create_task_ctx,
2245 )
2246 .await?;
2247
2248 assert_eq!(
2249 metrics.spill_count(),
2250 Some(0),
2251 "Expected no spills when sorting in place"
2252 );
2253 }
2254
2255 {
2257 let metrics = test_sort_output_batch_size(
2258 1,
2260 batch_size * 3,
2261 create_task_ctx,
2262 )
2263 .await?;
2264
2265 assert_eq!(
2266 metrics.spill_count(),
2267 Some(0),
2268 "Expected no spills when sorting in place"
2269 );
2270 }
2271
2272 Ok(())
2273 }
2274
2275 #[tokio::test]
2276 async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill(
2277 ) -> Result<()> {
2278 let batch_size = 100;
2279
2280 let create_task_ctx = |generated_batches: &[RecordBatch]| {
2281 let batches_memory = generated_batches
2282 .iter()
2283 .map(|b| b.get_array_memory_size())
2284 .sum::<usize>();
2285
2286 TaskContext::default()
2287 .with_session_config(
2288 SessionConfig::new()
2289 .with_batch_size(batch_size)
2290 .with_sort_in_place_threshold_bytes(1)
2292 .with_sort_spill_reservation_bytes(1),
2293 )
2294 .with_runtime(
2295 RuntimeEnvBuilder::default()
2296 .with_memory_limit(batches_memory, 1.0)
2297 .build_arc()
2298 .unwrap(),
2299 )
2300 };
2301
2302 {
2304 let metrics =
2305 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2306
2307 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2308 }
2309
2310 {
2312 let metrics =
2313 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2314
2315 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2316 }
2317
2318 {
2320 let metrics =
2321 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2322
2323 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2324 }
2325
2326 Ok(())
2327 }
2328
2329 async fn test_sort_output_batch_size(
2330 number_of_batches: usize,
2331 batch_size_to_generate: usize,
2332 create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2333 ) -> Result<MetricsSet> {
2334 let batches = (0..number_of_batches)
2335 .map(|_| make_partition(batch_size_to_generate as i32))
2336 .collect::<Vec<_>>();
2337 let task_ctx = create_task_ctx(batches.as_slice());
2338
2339 let expected_batch_size = task_ctx.session_config().batch_size();
2340
2341 let (mut output_batches, metrics) =
2342 run_sort_on_input(task_ctx, "i", batches).await?;
2343
2344 let last_batch = output_batches.pop().unwrap();
2345
2346 for batch in output_batches {
2347 assert_eq!(batch.num_rows(), expected_batch_size);
2348 }
2349
2350 let mut last_expected_batch_size =
2351 (batch_size_to_generate * number_of_batches) % expected_batch_size;
2352 if last_expected_batch_size == 0 {
2353 last_expected_batch_size = expected_batch_size;
2354 }
2355 assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2356
2357 Ok(metrics)
2358 }
2359
2360 async fn run_sort_on_input(
2361 task_ctx: TaskContext,
2362 order_by_col: &str,
2363 batches: Vec<RecordBatch>,
2364 ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2365 let task_ctx = Arc::new(task_ctx);
2366
2367 let schema = batches[0].schema();
2369 let ordering: LexOrdering = [PhysicalSortExpr {
2370 expr: col(order_by_col, &schema)?,
2371 options: SortOptions {
2372 descending: false,
2373 nulls_first: true,
2374 },
2375 }]
2376 .into();
2377 let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2378 ordering.clone(),
2379 TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2380 ));
2381
2382 let sorted_batches =
2383 collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2384
2385 let metrics = sort_exec.metrics().expect("sort have metrics");
2386
2387 {
2389 let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2390 let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2391
2392 let sorted_batches_concat =
2393 concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2394
2395 assert_eq!(sorted_input_batch, sorted_batches_concat);
2396 }
2397
2398 Ok((sorted_batches, metrics))
2399 }
2400}