1use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use crate::common::spawn_buffered;
28use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
29use crate::expressions::PhysicalSortExpr;
30use crate::limit::LimitStream;
31use crate::metrics::{
32 BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
33};
34use crate::projection::{make_with_child, update_expr, ProjectionExec};
35use crate::sorts::streaming_merge::StreamingMergeBuilder;
36use crate::spill::get_record_batch_memory_size;
37use crate::spill::in_progress_spill_file::InProgressSpillFile;
38use crate::spill::spill_manager::SpillManager;
39use crate::stream::RecordBatchStreamAdapter;
40use crate::topk::TopK;
41use crate::{
42 DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
43 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
44 Statistics,
45};
46
47use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
48use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
49use arrow::datatypes::SchemaRef;
50use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
51use datafusion_execution::disk_manager::RefCountedTempFile;
52use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
53use datafusion_execution::runtime_env::RuntimeEnv;
54use datafusion_execution::TaskContext;
55use datafusion_physical_expr::LexOrdering;
56use datafusion_physical_expr_common::sort_expr::LexRequirement;
57
58use futures::{StreamExt, TryStreamExt};
59use log::{debug, trace};
60
61struct ExternalSorterMetrics {
62 baseline: BaselineMetrics,
64
65 spill_metrics: SpillMetrics,
66}
67
68impl ExternalSorterMetrics {
69 fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
70 Self {
71 baseline: BaselineMetrics::new(metrics, partition),
72 spill_metrics: SpillMetrics::new(metrics, partition),
73 }
74 }
75}
76
77struct ExternalSorter {
196 schema: SchemaRef,
202 expr: Arc<[PhysicalSortExpr]>,
204 batch_size: usize,
206 sort_in_place_threshold_bytes: usize,
210
211 in_mem_batches: Vec<RecordBatch>,
217
218 in_progress_spill_file: Option<InProgressSpillFile>,
221 finished_spill_files: Vec<RefCountedTempFile>,
226
227 metrics: ExternalSorterMetrics,
233 runtime: Arc<RuntimeEnv>,
235 reservation: MemoryReservation,
237 spill_manager: SpillManager,
238
239 merge_reservation: MemoryReservation,
243 sort_spill_reservation_bytes: usize,
246}
247
248impl ExternalSorter {
249 #[allow(clippy::too_many_arguments)]
252 pub fn new(
253 partition_id: usize,
254 schema: SchemaRef,
255 expr: LexOrdering,
256 batch_size: usize,
257 sort_spill_reservation_bytes: usize,
258 sort_in_place_threshold_bytes: usize,
259 metrics: &ExecutionPlanMetricsSet,
260 runtime: Arc<RuntimeEnv>,
261 ) -> Result<Self> {
262 let metrics = ExternalSorterMetrics::new(metrics, partition_id);
263 let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
264 .with_can_spill(true)
265 .register(&runtime.memory_pool);
266
267 let merge_reservation =
268 MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
269 .register(&runtime.memory_pool);
270
271 let spill_manager = SpillManager::new(
272 Arc::clone(&runtime),
273 metrics.spill_metrics.clone(),
274 Arc::clone(&schema),
275 );
276
277 Ok(Self {
278 schema,
279 in_mem_batches: vec![],
280 in_progress_spill_file: None,
281 finished_spill_files: vec![],
282 expr: expr.into(),
283 metrics,
284 reservation,
285 spill_manager,
286 merge_reservation,
287 runtime,
288 batch_size,
289 sort_spill_reservation_bytes,
290 sort_in_place_threshold_bytes,
291 })
292 }
293
294 async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
298 if input.num_rows() == 0 {
299 return Ok(());
300 }
301
302 self.reserve_memory_for_merge()?;
303 self.reserve_memory_for_batch_and_maybe_spill(&input)
304 .await?;
305
306 self.in_mem_batches.push(input);
307 Ok(())
308 }
309
310 fn spilled_before(&self) -> bool {
311 !self.finished_spill_files.is_empty()
312 }
313
314 async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
324 self.merge_reservation.free();
328
329 if self.spilled_before() {
330 let mut streams = vec![];
331
332 if !self.in_mem_batches.is_empty() {
336 self.sort_and_spill_in_mem_batches().await?;
337 }
338
339 for spill in self.finished_spill_files.drain(..) {
340 if !spill.path().exists() {
341 return internal_err!("Spill file {:?} does not exist", spill.path());
342 }
343 let stream = self.spill_manager.read_spill_as_stream(spill)?;
344 streams.push(stream);
345 }
346
347 let expressions: LexOrdering = self.expr.iter().cloned().collect();
348
349 StreamingMergeBuilder::new()
350 .with_streams(streams)
351 .with_schema(Arc::clone(&self.schema))
352 .with_expressions(expressions.as_ref())
353 .with_metrics(self.metrics.baseline.clone())
354 .with_batch_size(self.batch_size)
355 .with_fetch(None)
356 .with_reservation(self.merge_reservation.new_empty())
357 .build()
358 } else {
359 self.in_mem_sort_stream(self.metrics.baseline.clone())
360 }
361 }
362
363 fn used(&self) -> usize {
365 self.reservation.size()
366 }
367
368 fn spilled_bytes(&self) -> usize {
370 self.metrics.spill_metrics.spilled_bytes.value()
371 }
372
373 fn spilled_rows(&self) -> usize {
375 self.metrics.spill_metrics.spilled_rows.value()
376 }
377
378 fn spill_count(&self) -> usize {
380 self.metrics.spill_metrics.spill_file_count.value()
381 }
382
383 async fn consume_and_spill_append(
386 &mut self,
387 globally_sorted_batches: &mut Vec<RecordBatch>,
388 ) -> Result<()> {
389 if globally_sorted_batches.is_empty() {
390 return Ok(());
391 }
392
393 if self.in_progress_spill_file.is_none() {
395 self.in_progress_spill_file =
396 Some(self.spill_manager.create_in_progress_file("Sorting")?);
397 }
398
399 Self::organize_stringview_arrays(globally_sorted_batches)?;
400
401 debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
402
403 let batches_to_spill = std::mem::take(globally_sorted_batches);
404 self.reservation.free();
405
406 let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| {
407 internal_datafusion_err!("In-progress spill file should be initialized")
408 })?;
409
410 for batch in batches_to_spill {
411 in_progress_file.append_batch(&batch)?;
412 }
413
414 if !globally_sorted_batches.is_empty() {
415 return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking.");
416 }
417
418 Ok(())
419 }
420
421 async fn spill_finish(&mut self) -> Result<()> {
423 let mut in_progress_file =
424 self.in_progress_spill_file.take().ok_or_else(|| {
425 internal_datafusion_err!("Should be called after `spill_append`")
426 })?;
427 let spill_file = in_progress_file.finish()?;
428
429 if let Some(spill_file) = spill_file {
430 self.finished_spill_files.push(spill_file);
431 }
432
433 Ok(())
434 }
435
436 fn organize_stringview_arrays(
466 globally_sorted_batches: &mut Vec<RecordBatch>,
467 ) -> Result<()> {
468 let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
469
470 for batch in globally_sorted_batches.drain(..) {
471 let mut new_columns: Vec<Arc<dyn Array>> =
472 Vec::with_capacity(batch.num_columns());
473
474 let mut arr_mutated = false;
475 for array in batch.columns() {
476 if let Some(string_view_array) =
477 array.as_any().downcast_ref::<StringViewArray>()
478 {
479 let new_array = string_view_array.gc();
480 new_columns.push(Arc::new(new_array));
481 arr_mutated = true;
482 } else {
483 new_columns.push(Arc::clone(array));
484 }
485 }
486
487 let organized_batch = if arr_mutated {
488 RecordBatch::try_new(batch.schema(), new_columns)?
489 } else {
490 batch
491 };
492
493 organized_batches.push(organized_batch);
494 }
495
496 *globally_sorted_batches = organized_batches;
497
498 Ok(())
499 }
500
501 async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
504 if self.in_mem_batches.is_empty() {
505 return internal_err!(
506 "in_mem_batches must not be empty when attempting to sort and spill"
507 );
508 }
509
510 self.merge_reservation.free();
515
516 let mut sorted_stream =
517 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
518 if !self.in_mem_batches.is_empty() {
521 return internal_err!(
522 "in_mem_batches should be empty after constructing sorted stream"
523 );
524 }
525 let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
529
530 while let Some(batch) = sorted_stream.next().await {
531 let batch = batch?;
532 let sorted_size = get_reserved_byte_for_record_batch(&batch);
533 if self.reservation.try_grow(sorted_size).is_err() {
534 globally_sorted_batches.push(batch);
538 self.consume_and_spill_append(&mut globally_sorted_batches)
539 .await?; } else {
541 globally_sorted_batches.push(batch);
542 }
543 }
544
545 drop(sorted_stream);
548
549 self.consume_and_spill_append(&mut globally_sorted_batches)
550 .await?;
551 self.spill_finish().await?;
552
553 let buffers_cleared_property =
555 self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
556 if !buffers_cleared_property {
557 return internal_err!(
558 "in_mem_batches and globally_sorted_batches should be cleared before"
559 );
560 }
561
562 self.reserve_memory_for_merge()?;
564
565 Ok(())
566 }
567
568 fn in_mem_sort_stream(
627 &mut self,
628 metrics: BaselineMetrics,
629 ) -> Result<SendableRecordBatchStream> {
630 if self.in_mem_batches.is_empty() {
631 return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
632 &self.schema,
633 ))));
634 }
635
636 let elapsed_compute = metrics.elapsed_compute().clone();
639 let _timer = elapsed_compute.timer();
640
641 if self.in_mem_batches.len() == 1 {
648 let batch = self.in_mem_batches.swap_remove(0);
649 let reservation = self.reservation.take();
650 return self.sort_batch_stream(batch, metrics, reservation);
651 }
652
653 if self.reservation.size() < self.sort_in_place_threshold_bytes {
655 let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
657 self.in_mem_batches.clear();
658 self.reservation
659 .try_resize(get_reserved_byte_for_record_batch(&batch))
660 .map_err(Self::err_with_oom_context)?;
661 let reservation = self.reservation.take();
662 return self.sort_batch_stream(batch, metrics, reservation);
663 }
664
665 let streams = std::mem::take(&mut self.in_mem_batches)
666 .into_iter()
667 .map(|batch| {
668 let metrics = self.metrics.baseline.intermediate();
669 let reservation = self
670 .reservation
671 .split(get_reserved_byte_for_record_batch(&batch));
672 let input = self.sort_batch_stream(batch, metrics, reservation)?;
673 Ok(spawn_buffered(input, 1))
674 })
675 .collect::<Result<_>>()?;
676
677 let expressions: LexOrdering = self.expr.iter().cloned().collect();
678
679 StreamingMergeBuilder::new()
680 .with_streams(streams)
681 .with_schema(Arc::clone(&self.schema))
682 .with_expressions(expressions.as_ref())
683 .with_metrics(metrics)
684 .with_batch_size(self.batch_size)
685 .with_fetch(None)
686 .with_reservation(self.merge_reservation.new_empty())
687 .build()
688 }
689
690 fn sort_batch_stream(
695 &self,
696 batch: RecordBatch,
697 metrics: BaselineMetrics,
698 reservation: MemoryReservation,
699 ) -> Result<SendableRecordBatchStream> {
700 assert_eq!(
701 get_reserved_byte_for_record_batch(&batch),
702 reservation.size()
703 );
704 let schema = batch.schema();
705
706 let expressions: LexOrdering = self.expr.iter().cloned().collect();
707 let stream = futures::stream::once(async move {
708 let _timer = metrics.elapsed_compute().timer();
709
710 let sorted = sort_batch(&batch, &expressions, None)?;
711
712 metrics.record_output(sorted.num_rows());
713 drop(batch);
714 drop(reservation);
715 Ok(sorted)
716 });
717
718 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
719 }
720
721 fn reserve_memory_for_merge(&mut self) -> Result<()> {
725 if self.runtime.disk_manager.tmp_files_enabled() {
727 let size = self.sort_spill_reservation_bytes;
728 if self.merge_reservation.size() != size {
729 self.merge_reservation
730 .try_resize(size)
731 .map_err(Self::err_with_oom_context)?;
732 }
733 }
734
735 Ok(())
736 }
737
738 async fn reserve_memory_for_batch_and_maybe_spill(
741 &mut self,
742 input: &RecordBatch,
743 ) -> Result<()> {
744 let size = get_reserved_byte_for_record_batch(input);
745
746 match self.reservation.try_grow(size) {
747 Ok(_) => Ok(()),
748 Err(e) => {
749 if self.in_mem_batches.is_empty() {
750 return Err(Self::err_with_oom_context(e));
751 }
752
753 self.sort_and_spill_in_mem_batches().await?;
755 self.reservation
756 .try_grow(size)
757 .map_err(Self::err_with_oom_context)
758 }
759 }
760 }
761
762 fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
765 match e {
766 DataFusionError::ResourcesExhausted(_) => e.context(
767 "Not enough memory to continue external sort. \
768 Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
769 ),
770 _ => e,
772 }
773 }
774}
775
776fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
784 get_record_batch_memory_size(batch) * 2
788}
789
790impl Debug for ExternalSorter {
791 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
792 f.debug_struct("ExternalSorter")
793 .field("memory_used", &self.used())
794 .field("spilled_bytes", &self.spilled_bytes())
795 .field("spilled_rows", &self.spilled_rows())
796 .field("spill_count", &self.spill_count())
797 .finish()
798 }
799}
800
801pub fn sort_batch(
802 batch: &RecordBatch,
803 expressions: &LexOrdering,
804 fetch: Option<usize>,
805) -> Result<RecordBatch> {
806 let sort_columns = expressions
807 .iter()
808 .map(|expr| expr.evaluate_to_sort_column(batch))
809 .collect::<Result<Vec<_>>>()?;
810
811 let indices = lexsort_to_indices(&sort_columns, fetch)?;
812 let mut columns = take_arrays(batch.columns(), &indices, None)?;
813
814 columns.iter_mut().for_each(|c| {
819 c.shrink_to_fit();
820 });
821
822 let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
823 Ok(RecordBatch::try_new_with_options(
824 batch.schema(),
825 columns,
826 &options,
827 )?)
828}
829
830#[derive(Debug, Clone)]
835pub struct SortExec {
836 pub(crate) input: Arc<dyn ExecutionPlan>,
838 expr: LexOrdering,
840 metrics_set: ExecutionPlanMetricsSet,
842 preserve_partitioning: bool,
845 fetch: Option<usize>,
847 common_sort_prefix: LexOrdering,
849 cache: PlanProperties,
851}
852
853impl SortExec {
854 pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
857 let preserve_partitioning = false;
858 let (cache, sort_prefix) =
859 Self::compute_properties(&input, expr.clone(), preserve_partitioning);
860 Self {
861 expr,
862 input,
863 metrics_set: ExecutionPlanMetricsSet::new(),
864 preserve_partitioning,
865 fetch: None,
866 common_sort_prefix: sort_prefix,
867 cache,
868 }
869 }
870
871 pub fn preserve_partitioning(&self) -> bool {
873 self.preserve_partitioning
874 }
875
876 pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
884 self.preserve_partitioning = preserve_partitioning;
885 self.cache = self
886 .cache
887 .with_partitioning(Self::output_partitioning_helper(
888 &self.input,
889 self.preserve_partitioning,
890 ));
891 self
892 }
893
894 pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
902 let mut cache = self.cache.clone();
903 let is_pipeline_friendly = matches!(
907 self.cache.emission_type,
908 EmissionType::Incremental | EmissionType::Both
909 );
910 if fetch.is_some() && is_pipeline_friendly {
911 cache = cache.with_boundedness(Boundedness::Bounded);
912 }
913 SortExec {
914 input: Arc::clone(&self.input),
915 expr: self.expr.clone(),
916 metrics_set: self.metrics_set.clone(),
917 preserve_partitioning: self.preserve_partitioning,
918 common_sort_prefix: self.common_sort_prefix.clone(),
919 fetch,
920 cache,
921 }
922 }
923
924 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
926 &self.input
927 }
928
929 pub fn expr(&self) -> &LexOrdering {
931 &self.expr
932 }
933
934 pub fn fetch(&self) -> Option<usize> {
936 self.fetch
937 }
938
939 fn output_partitioning_helper(
940 input: &Arc<dyn ExecutionPlan>,
941 preserve_partitioning: bool,
942 ) -> Partitioning {
943 if preserve_partitioning {
945 input.output_partitioning().clone()
946 } else {
947 Partitioning::UnknownPartitioning(1)
948 }
949 }
950
951 fn compute_properties(
954 input: &Arc<dyn ExecutionPlan>,
955 sort_exprs: LexOrdering,
956 preserve_partitioning: bool,
957 ) -> (PlanProperties, LexOrdering) {
958 let requirement = LexRequirement::from(sort_exprs);
960
961 let (sort_prefix, sort_satisfied) = input
962 .equivalence_properties()
963 .extract_common_sort_prefix(&requirement);
964
965 let emission_type = if sort_satisfied {
969 input.pipeline_behavior()
970 } else {
971 EmissionType::Final
972 };
973
974 let boundedness = if sort_satisfied {
980 input.boundedness()
981 } else {
982 match input.boundedness() {
983 Boundedness::Unbounded { .. } => Boundedness::Unbounded {
984 requires_infinite_memory: true,
985 },
986 bounded => bounded,
987 }
988 };
989
990 let sort_exprs = LexOrdering::from(requirement);
993 let eq_properties = input
994 .equivalence_properties()
995 .clone()
996 .with_reorder(sort_exprs);
997
998 let output_partitioning =
1000 Self::output_partitioning_helper(input, preserve_partitioning);
1001
1002 (
1003 PlanProperties::new(
1004 eq_properties,
1005 output_partitioning,
1006 emission_type,
1007 boundedness,
1008 ),
1009 LexOrdering::from(sort_prefix),
1010 )
1011 }
1012}
1013
1014impl DisplayAs for SortExec {
1015 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1016 match t {
1017 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1018 let preserve_partitioning = self.preserve_partitioning;
1019 match self.fetch {
1020 Some(fetch) => {
1021 write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
1022 if !self.common_sort_prefix.is_empty() {
1023 write!(f, ", sort_prefix=[{}]", self.common_sort_prefix)
1024 } else {
1025 Ok(())
1026 }
1027 }
1028 None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
1029 }
1030 }
1031 DisplayFormatType::TreeRender => match self.fetch {
1032 Some(fetch) => {
1033 writeln!(f, "{}", self.expr)?;
1034 writeln!(f, "limit={fetch}")
1035 }
1036 None => {
1037 writeln!(f, "{}", self.expr)
1038 }
1039 },
1040 }
1041 }
1042}
1043
1044impl ExecutionPlan for SortExec {
1045 fn name(&self) -> &'static str {
1046 match self.fetch {
1047 Some(_) => "SortExec(TopK)",
1048 None => "SortExec",
1049 }
1050 }
1051
1052 fn as_any(&self) -> &dyn Any {
1053 self
1054 }
1055
1056 fn properties(&self) -> &PlanProperties {
1057 &self.cache
1058 }
1059
1060 fn required_input_distribution(&self) -> Vec<Distribution> {
1061 if self.preserve_partitioning {
1062 vec![Distribution::UnspecifiedDistribution]
1063 } else {
1064 vec![Distribution::SinglePartition]
1067 }
1068 }
1069
1070 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1071 vec![&self.input]
1072 }
1073
1074 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1075 vec![false]
1076 }
1077
1078 fn with_new_children(
1079 self: Arc<Self>,
1080 children: Vec<Arc<dyn ExecutionPlan>>,
1081 ) -> Result<Arc<dyn ExecutionPlan>> {
1082 let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
1083 .with_fetch(self.fetch)
1084 .with_preserve_partitioning(self.preserve_partitioning);
1085
1086 Ok(Arc::new(new_sort))
1087 }
1088
1089 fn execute(
1090 &self,
1091 partition: usize,
1092 context: Arc<TaskContext>,
1093 ) -> Result<SendableRecordBatchStream> {
1094 trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
1095
1096 let mut input = self.input.execute(partition, Arc::clone(&context))?;
1097
1098 let execution_options = &context.session_config().options().execution;
1099
1100 trace!("End SortExec's input.execute for partition: {partition}");
1101
1102 let requirement = &LexRequirement::from(self.expr.clone());
1103
1104 let sort_satisfied = self
1105 .input
1106 .equivalence_properties()
1107 .ordering_satisfy_requirement(requirement);
1108
1109 match (sort_satisfied, self.fetch.as_ref()) {
1110 (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1111 input,
1112 0,
1113 Some(*fetch),
1114 BaselineMetrics::new(&self.metrics_set, partition),
1115 ))),
1116 (true, None) => Ok(input),
1117 (false, Some(fetch)) => {
1118 let mut topk = TopK::try_new(
1119 partition,
1120 input.schema(),
1121 self.common_sort_prefix.clone(),
1122 self.expr.clone(),
1123 *fetch,
1124 context.session_config().batch_size(),
1125 context.runtime_env(),
1126 &self.metrics_set,
1127 )?;
1128 Ok(Box::pin(RecordBatchStreamAdapter::new(
1129 self.schema(),
1130 futures::stream::once(async move {
1131 while let Some(batch) = input.next().await {
1132 let batch = batch?;
1133 topk.insert_batch(batch)?;
1134 if topk.finished {
1135 break;
1136 }
1137 }
1138 topk.emit()
1139 })
1140 .try_flatten(),
1141 )))
1142 }
1143 (false, None) => {
1144 let mut sorter = ExternalSorter::new(
1145 partition,
1146 input.schema(),
1147 self.expr.clone(),
1148 context.session_config().batch_size(),
1149 execution_options.sort_spill_reservation_bytes,
1150 execution_options.sort_in_place_threshold_bytes,
1151 &self.metrics_set,
1152 context.runtime_env(),
1153 )?;
1154 Ok(Box::pin(RecordBatchStreamAdapter::new(
1155 self.schema(),
1156 futures::stream::once(async move {
1157 while let Some(batch) = input.next().await {
1158 let batch = batch?;
1159 sorter.insert_batch(batch).await?;
1160 }
1161 sorter.sort().await
1162 })
1163 .try_flatten(),
1164 )))
1165 }
1166 }
1167 }
1168
1169 fn metrics(&self) -> Option<MetricsSet> {
1170 Some(self.metrics_set.clone_inner())
1171 }
1172
1173 fn statistics(&self) -> Result<Statistics> {
1174 self.partition_statistics(None)
1175 }
1176
1177 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1178 if !self.preserve_partitioning() {
1179 return self.input.partition_statistics(None)?.with_fetch(
1180 self.schema(),
1181 self.fetch,
1182 0,
1183 1,
1184 );
1185 }
1186 self.input.partition_statistics(partition)?.with_fetch(
1187 self.schema(),
1188 self.fetch,
1189 0,
1190 1,
1191 )
1192 }
1193
1194 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1195 Some(Arc::new(SortExec::with_fetch(self, limit)))
1196 }
1197
1198 fn fetch(&self) -> Option<usize> {
1199 self.fetch
1200 }
1201
1202 fn cardinality_effect(&self) -> CardinalityEffect {
1203 if self.fetch.is_none() {
1204 CardinalityEffect::Equal
1205 } else {
1206 CardinalityEffect::LowerEqual
1207 }
1208 }
1209
1210 fn try_swapping_with_projection(
1214 &self,
1215 projection: &ProjectionExec,
1216 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1217 if projection.expr().len() >= projection.input().schema().fields().len() {
1219 return Ok(None);
1220 }
1221
1222 let mut updated_exprs = LexOrdering::default();
1223 for sort in self.expr() {
1224 let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)?
1225 else {
1226 return Ok(None);
1227 };
1228 updated_exprs.push(PhysicalSortExpr {
1229 expr: new_expr,
1230 options: sort.options,
1231 });
1232 }
1233
1234 Ok(Some(Arc::new(
1235 SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1236 .with_fetch(self.fetch())
1237 .with_preserve_partitioning(self.preserve_partitioning()),
1238 )))
1239 }
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244 use std::collections::HashMap;
1245 use std::pin::Pin;
1246 use std::task::{Context, Poll};
1247
1248 use super::*;
1249 use crate::coalesce_partitions::CoalescePartitionsExec;
1250 use crate::collect;
1251 use crate::execution_plan::Boundedness;
1252 use crate::expressions::col;
1253 use crate::test;
1254 use crate::test::assert_is_pending;
1255 use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1256 use crate::test::TestMemoryExec;
1257
1258 use arrow::array::*;
1259 use arrow::compute::SortOptions;
1260 use arrow::datatypes::*;
1261 use datafusion_common::cast::as_primitive_array;
1262 use datafusion_common::test_util::batches_to_string;
1263 use datafusion_common::{DataFusionError, Result, ScalarValue};
1264 use datafusion_execution::config::SessionConfig;
1265 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1266 use datafusion_execution::RecordBatchStream;
1267 use datafusion_physical_expr::expressions::{Column, Literal};
1268 use datafusion_physical_expr::EquivalenceProperties;
1269
1270 use futures::{FutureExt, Stream};
1271 use insta::assert_snapshot;
1272
1273 #[derive(Debug, Clone)]
1274 pub struct SortedUnboundedExec {
1275 schema: Schema,
1276 batch_size: u64,
1277 cache: PlanProperties,
1278 }
1279
1280 impl DisplayAs for SortedUnboundedExec {
1281 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1282 match t {
1283 DisplayFormatType::Default
1284 | DisplayFormatType::Verbose
1285 | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1286 }
1287 Ok(())
1288 }
1289 }
1290
1291 impl SortedUnboundedExec {
1292 fn compute_properties(schema: SchemaRef) -> PlanProperties {
1293 let mut eq_properties = EquivalenceProperties::new(schema);
1294 eq_properties.add_new_orderings(vec![LexOrdering::new(vec![
1295 PhysicalSortExpr::new_default(Arc::new(Column::new("c1", 0))),
1296 ])]);
1297 PlanProperties::new(
1298 eq_properties,
1299 Partitioning::UnknownPartitioning(1),
1300 EmissionType::Final,
1301 Boundedness::Unbounded {
1302 requires_infinite_memory: false,
1303 },
1304 )
1305 }
1306 }
1307
1308 impl ExecutionPlan for SortedUnboundedExec {
1309 fn name(&self) -> &'static str {
1310 Self::static_name()
1311 }
1312
1313 fn as_any(&self) -> &dyn Any {
1314 self
1315 }
1316
1317 fn properties(&self) -> &PlanProperties {
1318 &self.cache
1319 }
1320
1321 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1322 vec![]
1323 }
1324
1325 fn with_new_children(
1326 self: Arc<Self>,
1327 _: Vec<Arc<dyn ExecutionPlan>>,
1328 ) -> Result<Arc<dyn ExecutionPlan>> {
1329 Ok(self)
1330 }
1331
1332 fn execute(
1333 &self,
1334 _partition: usize,
1335 _context: Arc<TaskContext>,
1336 ) -> Result<SendableRecordBatchStream> {
1337 Ok(Box::pin(SortedUnboundedStream {
1338 schema: Arc::new(self.schema.clone()),
1339 batch_size: self.batch_size,
1340 offset: 0,
1341 }))
1342 }
1343 }
1344
1345 #[derive(Debug)]
1346 pub struct SortedUnboundedStream {
1347 schema: SchemaRef,
1348 batch_size: u64,
1349 offset: u64,
1350 }
1351
1352 impl Stream for SortedUnboundedStream {
1353 type Item = Result<RecordBatch>;
1354
1355 fn poll_next(
1356 mut self: Pin<&mut Self>,
1357 _cx: &mut Context<'_>,
1358 ) -> Poll<Option<Self::Item>> {
1359 let batch = SortedUnboundedStream::create_record_batch(
1360 Arc::clone(&self.schema),
1361 self.offset,
1362 self.batch_size,
1363 );
1364 self.offset += self.batch_size;
1365 Poll::Ready(Some(Ok(batch)))
1366 }
1367 }
1368
1369 impl RecordBatchStream for SortedUnboundedStream {
1370 fn schema(&self) -> SchemaRef {
1371 Arc::clone(&self.schema)
1372 }
1373 }
1374
1375 impl SortedUnboundedStream {
1376 fn create_record_batch(
1377 schema: SchemaRef,
1378 offset: u64,
1379 batch_size: u64,
1380 ) -> RecordBatch {
1381 let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1382 let array = UInt64Array::from(values);
1383 let array_ref: ArrayRef = Arc::new(array);
1384 RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1385 }
1386 }
1387
1388 #[tokio::test]
1389 async fn test_in_mem_sort() -> Result<()> {
1390 let task_ctx = Arc::new(TaskContext::default());
1391 let partitions = 4;
1392 let csv = test::scan_partitioned(partitions);
1393 let schema = csv.schema();
1394
1395 let sort_exec = Arc::new(SortExec::new(
1396 LexOrdering::new(vec![PhysicalSortExpr {
1397 expr: col("i", &schema)?,
1398 options: SortOptions::default(),
1399 }]),
1400 Arc::new(CoalescePartitionsExec::new(csv)),
1401 ));
1402
1403 let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1404
1405 assert_eq!(result.len(), 1);
1406 assert_eq!(result[0].num_rows(), 400);
1407
1408 assert_eq!(
1409 task_ctx.runtime_env().memory_pool.reserved(),
1410 0,
1411 "The sort should have returned all memory used back to the memory manager"
1412 );
1413
1414 Ok(())
1415 }
1416
1417 #[tokio::test]
1418 async fn test_sort_spill() -> Result<()> {
1419 let session_config = SessionConfig::new();
1421 let sort_spill_reservation_bytes = session_config
1422 .options()
1423 .execution
1424 .sort_spill_reservation_bytes;
1425 let runtime = RuntimeEnvBuilder::new()
1426 .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1427 .build_arc()?;
1428 let task_ctx = Arc::new(
1429 TaskContext::default()
1430 .with_session_config(session_config)
1431 .with_runtime(runtime),
1432 );
1433
1434 let partitions = 100;
1438 let input = test::scan_partitioned(partitions);
1439 let schema = input.schema();
1440
1441 let sort_exec = Arc::new(SortExec::new(
1442 LexOrdering::new(vec![PhysicalSortExpr {
1443 expr: col("i", &schema)?,
1444 options: SortOptions::default(),
1445 }]),
1446 Arc::new(CoalescePartitionsExec::new(input)),
1447 ));
1448
1449 let result = collect(
1450 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1451 Arc::clone(&task_ctx),
1452 )
1453 .await?;
1454
1455 assert_eq!(result.len(), 2);
1456
1457 let metrics = sort_exec.metrics().unwrap();
1459
1460 assert_eq!(metrics.output_rows().unwrap(), 10000);
1461 assert!(metrics.elapsed_compute().unwrap() > 0);
1462
1463 let spill_count = metrics.spill_count().unwrap();
1464 let spilled_rows = metrics.spilled_rows().unwrap();
1465 let spilled_bytes = metrics.spilled_bytes().unwrap();
1466 assert!((3..=10).contains(&spill_count));
1470 assert!((9000..=10000).contains(&spilled_rows));
1471 assert!((38000..=42000).contains(&spilled_bytes));
1472
1473 let columns = result[0].columns();
1474
1475 let i = as_primitive_array::<Int32Type>(&columns[0])?;
1476 assert_eq!(i.value(0), 0);
1477 assert_eq!(i.value(i.len() - 1), 81);
1478
1479 assert_eq!(
1480 task_ctx.runtime_env().memory_pool.reserved(),
1481 0,
1482 "The sort should have returned all memory used back to the memory manager"
1483 );
1484
1485 Ok(())
1486 }
1487
1488 #[tokio::test]
1489 async fn test_batch_reservation_error() -> Result<()> {
1490 let expected_batch_reservation = 800;
1493 let merge_reservation: usize = 0; let memory_limit: usize = expected_batch_reservation + merge_reservation - 1; let session_config =
1497 SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1498 let runtime = RuntimeEnvBuilder::new()
1499 .with_memory_limit(memory_limit, 1.0)
1500 .build_arc()?;
1501 let task_ctx = Arc::new(
1502 TaskContext::default()
1503 .with_session_config(session_config)
1504 .with_runtime(runtime),
1505 );
1506
1507 let plan = test::scan_partitioned(1);
1508
1509 {
1512 let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1513 let first_batch = stream.next().await.unwrap()?;
1514 let batch_reservation = get_reserved_byte_for_record_batch(&first_batch);
1515
1516 assert_eq!(batch_reservation, expected_batch_reservation);
1517 assert!(memory_limit < (merge_reservation + batch_reservation));
1518 }
1519
1520 let sort_exec = Arc::new(SortExec::new(
1521 LexOrdering::new(vec![PhysicalSortExpr {
1522 expr: col("i", &plan.schema())?,
1523 options: SortOptions::default(),
1524 }]),
1525 plan,
1526 ));
1527
1528 let result = collect(
1529 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1530 Arc::clone(&task_ctx),
1531 )
1532 .await;
1533
1534 let err = result.unwrap_err();
1535 assert!(
1536 matches!(err, DataFusionError::Context(..)),
1537 "Assertion failed: expected a Context error, but got: {err:?}"
1538 );
1539
1540 assert!(
1542 matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1543 "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1544 );
1545
1546 Ok(())
1547 }
1548
1549 #[tokio::test]
1550 async fn test_sort_spill_utf8_strings() -> Result<()> {
1551 let session_config = SessionConfig::new()
1552 .with_batch_size(100)
1553 .with_sort_in_place_threshold_bytes(20 * 1024)
1554 .with_sort_spill_reservation_bytes(100 * 1024);
1555 let runtime = RuntimeEnvBuilder::new()
1556 .with_memory_limit(500 * 1024, 1.0)
1557 .build_arc()?;
1558 let task_ctx = Arc::new(
1559 TaskContext::default()
1560 .with_session_config(session_config)
1561 .with_runtime(runtime),
1562 );
1563
1564 let input = test::scan_partitioned_utf8(200);
1568 let schema = input.schema();
1569
1570 let sort_exec = Arc::new(SortExec::new(
1571 LexOrdering::new(vec![PhysicalSortExpr {
1572 expr: col("i", &schema)?,
1573 options: SortOptions::default(),
1574 }]),
1575 Arc::new(CoalescePartitionsExec::new(input)),
1576 ));
1577
1578 let result = collect(
1579 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1580 Arc::clone(&task_ctx),
1581 )
1582 .await?;
1583
1584 let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1585 assert_eq!(num_rows, 20000);
1586
1587 let metrics = sort_exec.metrics().unwrap();
1589
1590 assert_eq!(metrics.output_rows().unwrap(), 20000);
1591 assert!(metrics.elapsed_compute().unwrap() > 0);
1592
1593 let spill_count = metrics.spill_count().unwrap();
1594 let spilled_rows = metrics.spilled_rows().unwrap();
1595 let spilled_bytes = metrics.spilled_bytes().unwrap();
1596
1597 assert!((4..=8).contains(&spill_count));
1611 assert!((15000..=20000).contains(&spilled_rows));
1612 assert!((900000..=1000000).contains(&spilled_bytes));
1613
1614 let concated_result = concat_batches(&schema, &result)?;
1616 let columns = concated_result.columns();
1617 let string_array = as_string_array(&columns[0]);
1618 for i in 0..string_array.len() - 1 {
1619 assert!(string_array.value(i) <= string_array.value(i + 1));
1620 }
1621
1622 assert_eq!(
1623 task_ctx.runtime_env().memory_pool.reserved(),
1624 0,
1625 "The sort should have returned all memory used back to the memory manager"
1626 );
1627
1628 Ok(())
1629 }
1630
1631 #[tokio::test]
1632 async fn test_sort_fetch_memory_calculation() -> Result<()> {
1633 let avg_batch_size = 400;
1635 let partitions = 4;
1636
1637 let test_options = vec![
1639 (None, true),
1642 (Some(1), false),
1645 ];
1646
1647 for (fetch, expect_spillage) in test_options {
1648 let session_config = SessionConfig::new();
1649 let sort_spill_reservation_bytes = session_config
1650 .options()
1651 .execution
1652 .sort_spill_reservation_bytes;
1653
1654 let runtime = RuntimeEnvBuilder::new()
1655 .with_memory_limit(
1656 sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1657 1.0,
1658 )
1659 .build_arc()?;
1660 let task_ctx = Arc::new(
1661 TaskContext::default()
1662 .with_runtime(runtime)
1663 .with_session_config(session_config),
1664 );
1665
1666 let csv = test::scan_partitioned(partitions);
1667 let schema = csv.schema();
1668
1669 let sort_exec = Arc::new(
1670 SortExec::new(
1671 LexOrdering::new(vec![PhysicalSortExpr {
1672 expr: col("i", &schema)?,
1673 options: SortOptions::default(),
1674 }]),
1675 Arc::new(CoalescePartitionsExec::new(csv)),
1676 )
1677 .with_fetch(fetch),
1678 );
1679
1680 let result = collect(
1681 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1682 Arc::clone(&task_ctx),
1683 )
1684 .await?;
1685 assert_eq!(result.len(), 1);
1686
1687 let metrics = sort_exec.metrics().unwrap();
1688 let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1689 assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1690 }
1691 Ok(())
1692 }
1693
1694 #[tokio::test]
1695 async fn test_sort_metadata() -> Result<()> {
1696 let task_ctx = Arc::new(TaskContext::default());
1697 let field_metadata: HashMap<String, String> =
1698 vec![("foo".to_string(), "bar".to_string())]
1699 .into_iter()
1700 .collect();
1701 let schema_metadata: HashMap<String, String> =
1702 vec![("baz".to_string(), "barf".to_string())]
1703 .into_iter()
1704 .collect();
1705
1706 let mut field = Field::new("field_name", DataType::UInt64, true);
1707 field.set_metadata(field_metadata.clone());
1708 let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1709 let schema = Arc::new(schema);
1710
1711 let data: ArrayRef =
1712 Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1713
1714 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data]).unwrap();
1715 let input =
1716 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
1717 .unwrap();
1718
1719 let sort_exec = Arc::new(SortExec::new(
1720 LexOrdering::new(vec![PhysicalSortExpr {
1721 expr: col("field_name", &schema)?,
1722 options: SortOptions::default(),
1723 }]),
1724 input,
1725 ));
1726
1727 let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1728
1729 let expected_data: ArrayRef =
1730 Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1731 let expected_batch =
1732 RecordBatch::try_new(Arc::clone(&schema), vec![expected_data]).unwrap();
1733
1734 assert_eq!(&vec![expected_batch], &result);
1736
1737 assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1739 assert_eq!(result[0].schema().metadata(), &schema_metadata);
1740
1741 Ok(())
1742 }
1743
1744 #[tokio::test]
1745 async fn test_lex_sort_by_mixed_types() -> Result<()> {
1746 let task_ctx = Arc::new(TaskContext::default());
1747 let schema = Arc::new(Schema::new(vec![
1748 Field::new("a", DataType::Int32, true),
1749 Field::new(
1750 "b",
1751 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1752 true,
1753 ),
1754 ]));
1755
1756 let batch = RecordBatch::try_new(
1758 Arc::clone(&schema),
1759 vec![
1760 Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
1761 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1762 Some(vec![Some(3)]),
1763 Some(vec![Some(1)]),
1764 Some(vec![Some(6), None]),
1765 Some(vec![Some(5)]),
1766 ])),
1767 ],
1768 )?;
1769
1770 let sort_exec = Arc::new(SortExec::new(
1771 LexOrdering::new(vec![
1772 PhysicalSortExpr {
1773 expr: col("a", &schema)?,
1774 options: SortOptions {
1775 descending: false,
1776 nulls_first: true,
1777 },
1778 },
1779 PhysicalSortExpr {
1780 expr: col("b", &schema)?,
1781 options: SortOptions {
1782 descending: true,
1783 nulls_first: false,
1784 },
1785 },
1786 ]),
1787 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
1788 ));
1789
1790 assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
1791 assert_eq!(
1792 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1793 *sort_exec.schema().field(1).data_type()
1794 );
1795
1796 let result: Vec<RecordBatch> =
1797 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1798 let metrics = sort_exec.metrics().unwrap();
1799 assert!(metrics.elapsed_compute().unwrap() > 0);
1800 assert_eq!(metrics.output_rows().unwrap(), 4);
1801 assert_eq!(result.len(), 1);
1802
1803 let expected = RecordBatch::try_new(
1804 schema,
1805 vec![
1806 Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
1807 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1808 Some(vec![Some(1)]),
1809 Some(vec![Some(6), None]),
1810 Some(vec![Some(5)]),
1811 Some(vec![Some(3)]),
1812 ])),
1813 ],
1814 )?;
1815
1816 assert_eq!(expected, result[0]);
1817
1818 Ok(())
1819 }
1820
1821 #[tokio::test]
1822 async fn test_lex_sort_by_float() -> Result<()> {
1823 let task_ctx = Arc::new(TaskContext::default());
1824 let schema = Arc::new(Schema::new(vec![
1825 Field::new("a", DataType::Float32, true),
1826 Field::new("b", DataType::Float64, true),
1827 ]));
1828
1829 let batch = RecordBatch::try_new(
1831 Arc::clone(&schema),
1832 vec![
1833 Arc::new(Float32Array::from(vec![
1834 Some(f32::NAN),
1835 None,
1836 None,
1837 Some(f32::NAN),
1838 Some(1.0_f32),
1839 Some(1.0_f32),
1840 Some(2.0_f32),
1841 Some(3.0_f32),
1842 ])),
1843 Arc::new(Float64Array::from(vec![
1844 Some(200.0_f64),
1845 Some(20.0_f64),
1846 Some(10.0_f64),
1847 Some(100.0_f64),
1848 Some(f64::NAN),
1849 None,
1850 None,
1851 Some(f64::NAN),
1852 ])),
1853 ],
1854 )?;
1855
1856 let sort_exec = Arc::new(SortExec::new(
1857 LexOrdering::new(vec![
1858 PhysicalSortExpr {
1859 expr: col("a", &schema)?,
1860 options: SortOptions {
1861 descending: true,
1862 nulls_first: true,
1863 },
1864 },
1865 PhysicalSortExpr {
1866 expr: col("b", &schema)?,
1867 options: SortOptions {
1868 descending: false,
1869 nulls_first: false,
1870 },
1871 },
1872 ]),
1873 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
1874 ));
1875
1876 assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
1877 assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
1878
1879 let result: Vec<RecordBatch> =
1880 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1881 let metrics = sort_exec.metrics().unwrap();
1882 assert!(metrics.elapsed_compute().unwrap() > 0);
1883 assert_eq!(metrics.output_rows().unwrap(), 8);
1884 assert_eq!(result.len(), 1);
1885
1886 let columns = result[0].columns();
1887
1888 assert_eq!(DataType::Float32, *columns[0].data_type());
1889 assert_eq!(DataType::Float64, *columns[1].data_type());
1890
1891 let a = as_primitive_array::<Float32Type>(&columns[0])?;
1892 let b = as_primitive_array::<Float64Type>(&columns[1])?;
1893
1894 let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
1896 .map(|i| {
1897 let aval = if a.is_valid(i) {
1898 Some(a.value(i).to_string())
1899 } else {
1900 None
1901 };
1902 let bval = if b.is_valid(i) {
1903 Some(b.value(i).to_string())
1904 } else {
1905 None
1906 };
1907 (aval, bval)
1908 })
1909 .collect();
1910
1911 let expected: Vec<(Option<String>, Option<String>)> = vec![
1912 (None, Some("10".to_owned())),
1913 (None, Some("20".to_owned())),
1914 (Some("NaN".to_owned()), Some("100".to_owned())),
1915 (Some("NaN".to_owned()), Some("200".to_owned())),
1916 (Some("3".to_owned()), Some("NaN".to_owned())),
1917 (Some("2".to_owned()), None),
1918 (Some("1".to_owned()), Some("NaN".to_owned())),
1919 (Some("1".to_owned()), None),
1920 ];
1921
1922 assert_eq!(expected, result);
1923
1924 Ok(())
1925 }
1926
1927 #[tokio::test]
1928 async fn test_drop_cancel() -> Result<()> {
1929 let task_ctx = Arc::new(TaskContext::default());
1930 let schema =
1931 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
1932
1933 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
1934 let refs = blocking_exec.refs();
1935 let sort_exec = Arc::new(SortExec::new(
1936 LexOrdering::new(vec![PhysicalSortExpr {
1937 expr: col("a", &schema)?,
1938 options: SortOptions::default(),
1939 }]),
1940 blocking_exec,
1941 ));
1942
1943 let fut = collect(sort_exec, Arc::clone(&task_ctx));
1944 let mut fut = fut.boxed();
1945
1946 assert_is_pending(&mut fut);
1947 drop(fut);
1948 assert_strong_count_converges_to_zero(refs).await;
1949
1950 assert_eq!(
1951 task_ctx.runtime_env().memory_pool.reserved(),
1952 0,
1953 "The sort should have returned all memory used back to the memory manager"
1954 );
1955
1956 Ok(())
1957 }
1958
1959 #[test]
1960 fn test_empty_sort_batch() {
1961 let schema = Arc::new(Schema::empty());
1962 let options = RecordBatchOptions::new().with_row_count(Some(1));
1963 let batch =
1964 RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
1965 .unwrap();
1966
1967 let expressions = LexOrdering::new(vec![PhysicalSortExpr {
1968 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
1969 options: SortOptions::default(),
1970 }]);
1971
1972 let result = sort_batch(&batch, expressions.as_ref(), None).unwrap();
1973 assert_eq!(result.num_rows(), 1);
1974 }
1975
1976 #[tokio::test]
1977 async fn topk_unbounded_source() -> Result<()> {
1978 let task_ctx = Arc::new(TaskContext::default());
1979 let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
1980 let source = SortedUnboundedExec {
1981 schema: schema.clone(),
1982 batch_size: 2,
1983 cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
1984 };
1985 let mut plan = SortExec::new(
1986 LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
1987 "c1", 0,
1988 )))]),
1989 Arc::new(source),
1990 );
1991 plan = plan.with_fetch(Some(9));
1992
1993 let batches = collect(Arc::new(plan), task_ctx).await?;
1994 assert_snapshot!(batches_to_string(&batches), @r#"
1995 +----+
1996 | c1 |
1997 +----+
1998 | 0 |
1999 | 1 |
2000 | 2 |
2001 | 3 |
2002 | 4 |
2003 | 5 |
2004 | 6 |
2005 | 7 |
2006 | 8 |
2007 +----+
2008 "#);
2009 Ok(())
2010 }
2011}