1use std::fmt;
23use std::fmt::{Debug, Formatter};
24use std::sync::Arc;
25
26use parking_lot::RwLock;
27
28use crate::common::spawn_buffered;
29use crate::execution_plan::{
30 Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
31};
32use crate::expressions::PhysicalSortExpr;
33use crate::filter::FilterExec;
34use crate::filter_pushdown::{
35 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
36 FilterPushdownPropagation, PushedDown,
37};
38use crate::limit::LimitStream;
39use crate::metrics::{
40 BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
41};
42use crate::projection::{ProjectionExec, make_with_child, update_ordering};
43use crate::sorts::IncrementalSortIterator;
44use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
45use crate::spill::get_record_batch_memory_size;
46use crate::spill::in_progress_spill_file::InProgressSpillFile;
47use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
48use crate::stream::RecordBatchStreamAdapter;
49use crate::stream::ReservationStream;
50use crate::topk::TopK;
51use crate::topk::TopKDynamicFilters;
52use crate::{
53 DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
54 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
55 Statistics,
56};
57
58use arrow::array::{RecordBatch, RecordBatchOptions};
59use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
60use arrow::datatypes::SchemaRef;
61use datafusion_common::config::SpillCompression;
62use datafusion_common::{
63 DataFusionError, Result, assert_or_internal_err, internal_datafusion_err,
64 unwrap_or_internal_err,
65};
66use datafusion_execution::TaskContext;
67use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
68use datafusion_execution::runtime_env::RuntimeEnv;
69use datafusion_physical_expr::LexOrdering;
70use datafusion_physical_expr::PhysicalExpr;
71use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
72
73use futures::{StreamExt, TryStreamExt};
74use log::{debug, trace};
75
76struct ExternalSorterMetrics {
77 baseline: BaselineMetrics,
79
80 spill_metrics: SpillMetrics,
81}
82
83impl ExternalSorterMetrics {
84 fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
85 Self {
86 baseline: BaselineMetrics::new(metrics, partition),
87 spill_metrics: SpillMetrics::new(metrics, partition),
88 }
89 }
90}
91
92struct ExternalSorter {
210 schema: SchemaRef,
216 expr: LexOrdering,
218 batch_size: usize,
220 sort_in_place_threshold_bytes: usize,
224
225 in_mem_batches: Vec<RecordBatch>,
231
232 in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
239 finished_spill_files: Vec<SortedSpillFile>,
244
245 metrics: ExternalSorterMetrics,
251 runtime: Arc<RuntimeEnv>,
253 reservation: MemoryReservation,
255 spill_manager: SpillManager,
256
257 merge_reservation: MemoryReservation,
261 sort_spill_reservation_bytes: usize,
264}
265
266impl ExternalSorter {
267 #[expect(clippy::too_many_arguments)]
270 pub fn new(
271 partition_id: usize,
272 schema: SchemaRef,
273 expr: LexOrdering,
274 batch_size: usize,
275 sort_spill_reservation_bytes: usize,
276 sort_in_place_threshold_bytes: usize,
277 spill_compression: SpillCompression,
279 metrics: &ExecutionPlanMetricsSet,
280 runtime: Arc<RuntimeEnv>,
281 ) -> Result<Self> {
282 let metrics = ExternalSorterMetrics::new(metrics, partition_id);
283 let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
284 .with_can_spill(true)
285 .register(&runtime.memory_pool);
286
287 let merge_reservation =
288 MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
289 .register(&runtime.memory_pool);
290
291 let spill_manager = SpillManager::new(
292 Arc::clone(&runtime),
293 metrics.spill_metrics.clone(),
294 Arc::clone(&schema),
295 )
296 .with_compression_type(spill_compression);
297
298 Ok(Self {
299 schema,
300 in_mem_batches: vec![],
301 in_progress_spill_file: None,
302 finished_spill_files: vec![],
303 expr,
304 metrics,
305 reservation,
306 spill_manager,
307 merge_reservation,
308 runtime,
309 batch_size,
310 sort_spill_reservation_bytes,
311 sort_in_place_threshold_bytes,
312 })
313 }
314
315 async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
319 if input.num_rows() == 0 {
320 return Ok(());
321 }
322
323 self.reserve_memory_for_merge()?;
324 self.reserve_memory_for_batch_and_maybe_spill(&input)
325 .await?;
326
327 self.in_mem_batches.push(input);
328 Ok(())
329 }
330
331 fn spilled_before(&self) -> bool {
332 !self.finished_spill_files.is_empty()
333 }
334
335 async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
345 if self.spilled_before() {
346 if !self.in_mem_batches.is_empty() {
350 self.sort_and_spill_in_mem_batches().await?;
351 }
352
353 StreamingMergeBuilder::new()
361 .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
362 .with_spill_manager(self.spill_manager.clone())
363 .with_schema(Arc::clone(&self.schema))
364 .with_expressions(&self.expr.clone())
365 .with_metrics(self.metrics.baseline.clone())
366 .with_batch_size(self.batch_size)
367 .with_fetch(None)
368 .with_reservation(self.merge_reservation.take())
369 .build()
370 } else {
371 self.merge_reservation.free();
376 self.in_mem_sort_stream(self.metrics.baseline.clone())
377 }
378 }
379
380 fn used(&self) -> usize {
382 self.reservation.size()
383 }
384
385 #[cfg(test)]
387 fn merge_reservation_size(&self) -> usize {
388 self.merge_reservation.size()
389 }
390
391 fn spilled_bytes(&self) -> usize {
393 self.metrics.spill_metrics.spilled_bytes.value()
394 }
395
396 fn spilled_rows(&self) -> usize {
398 self.metrics.spill_metrics.spilled_rows.value()
399 }
400
401 fn spill_count(&self) -> usize {
403 self.metrics.spill_metrics.spill_file_count.value()
404 }
405
406 async fn consume_and_spill_append(
409 &mut self,
410 globally_sorted_batches: &mut Vec<RecordBatch>,
411 ) -> Result<()> {
412 if globally_sorted_batches.is_empty() {
413 return Ok(());
414 }
415
416 if self.in_progress_spill_file.is_none() {
418 self.in_progress_spill_file =
419 Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
420 }
421
422 debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
423
424 let batches_to_spill = std::mem::take(globally_sorted_batches);
425 self.reservation.free();
426
427 let (in_progress_file, max_record_batch_size) =
428 self.in_progress_spill_file.as_mut().ok_or_else(|| {
429 internal_datafusion_err!("In-progress spill file should be initialized")
430 })?;
431
432 for batch in batches_to_spill {
433 let gc_sliced_size = in_progress_file.append_batch(&batch)?;
434
435 *max_record_batch_size = (*max_record_batch_size).max(gc_sliced_size);
436 }
437
438 assert_or_internal_err!(
439 globally_sorted_batches.is_empty(),
440 "This function consumes globally_sorted_batches, so it should be empty after taking."
441 );
442
443 Ok(())
444 }
445
446 async fn spill_finish(&mut self) -> Result<()> {
448 let (mut in_progress_file, max_record_batch_memory) =
449 self.in_progress_spill_file.take().ok_or_else(|| {
450 internal_datafusion_err!("Should be called after `spill_append`")
451 })?;
452 let spill_file = in_progress_file.finish()?;
453
454 if let Some(spill_file) = spill_file {
455 self.finished_spill_files.push(SortedSpillFile {
456 file: spill_file,
457 max_record_batch_memory,
458 });
459 }
460
461 Ok(())
462 }
463
464 async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
467 assert_or_internal_err!(
468 !self.in_mem_batches.is_empty(),
469 "in_mem_batches must not be empty when attempting to sort and spill"
470 );
471
472 self.merge_reservation.free();
477
478 let mut sorted_stream =
479 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
480 assert_or_internal_err!(
483 self.in_mem_batches.is_empty(),
484 "in_mem_batches should be empty after constructing sorted stream"
485 );
486 let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
490
491 while let Some(batch) = sorted_stream.next().await {
492 let batch = batch?;
493 let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
494 if self.reservation.try_grow(sorted_size).is_err() {
495 globally_sorted_batches.push(batch);
499 self.consume_and_spill_append(&mut globally_sorted_batches)
500 .await?; } else {
502 globally_sorted_batches.push(batch);
503 }
504 }
505
506 drop(sorted_stream);
509
510 self.consume_and_spill_append(&mut globally_sorted_batches)
511 .await?;
512 self.spill_finish().await?;
513
514 let buffers_cleared_property =
516 self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
517 assert_or_internal_err!(
518 buffers_cleared_property,
519 "in_mem_batches and globally_sorted_batches should be cleared before"
520 );
521
522 self.reserve_memory_for_merge()?;
524
525 Ok(())
526 }
527
528 fn in_mem_sort_stream(
587 &mut self,
588 metrics: BaselineMetrics,
589 ) -> Result<SendableRecordBatchStream> {
590 if self.in_mem_batches.is_empty() {
591 return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
592 &self.schema,
593 ))));
594 }
595
596 let elapsed_compute = metrics.elapsed_compute().clone();
599 let _timer = elapsed_compute.timer();
600
601 if self.in_mem_batches.len() == 1 {
608 let batch = self.in_mem_batches.swap_remove(0);
609 let reservation = self.reservation.take();
610 return self.sort_batch_stream(batch, &metrics, reservation);
611 }
612
613 if self.reservation.size() < self.sort_in_place_threshold_bytes {
615 let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
617 self.in_mem_batches.clear();
618 self.reservation
619 .try_resize(get_reserved_bytes_for_record_batch(&batch)?)
620 .map_err(Self::err_with_oom_context)?;
621 let reservation = self.reservation.take();
622 return self.sort_batch_stream(batch, &metrics, reservation);
623 }
624
625 let streams = std::mem::take(&mut self.in_mem_batches)
626 .into_iter()
627 .map(|batch| {
628 let metrics = self.metrics.baseline.intermediate();
629 let reservation = self
630 .reservation
631 .split(get_reserved_bytes_for_record_batch(&batch)?);
632 let input = self.sort_batch_stream(batch, &metrics, reservation)?;
633 Ok(spawn_buffered(input, 1))
634 })
635 .collect::<Result<_>>()?;
636
637 StreamingMergeBuilder::new()
638 .with_streams(streams)
639 .with_schema(Arc::clone(&self.schema))
640 .with_expressions(&self.expr.clone())
641 .with_metrics(metrics)
642 .with_batch_size(self.batch_size)
643 .with_fetch(None)
644 .with_reservation(self.merge_reservation.new_empty())
645 .build()
646 }
647
648 fn sort_batch_stream(
658 &self,
659 batch: RecordBatch,
660 metrics: &BaselineMetrics,
661 reservation: MemoryReservation,
662 ) -> Result<SendableRecordBatchStream> {
663 assert_eq!(
664 get_reserved_bytes_for_record_batch(&batch)?,
665 reservation.size()
666 );
667
668 let schema = batch.schema();
669 let expressions = self.expr.clone();
670 let batch_size = self.batch_size;
671 let output_row_metrics = metrics.output_rows().clone();
672
673 let stream = futures::stream::once(async move {
674 let schema = batch.schema();
675
676 let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
678
679 let total_sorted_size: usize = sorted_batches
684 .iter()
685 .map(get_record_batch_memory_size)
686 .sum();
687 reservation
688 .try_resize(total_sorted_size)
689 .map_err(Self::err_with_oom_context)?;
690
691 Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
693 Arc::clone(&schema),
694 Box::pin(RecordBatchStreamAdapter::new(
695 Arc::clone(&schema),
696 futures::stream::iter(sorted_batches.into_iter().map(Ok)),
697 )),
698 reservation,
699 )) as SendableRecordBatchStream)
700 })
701 .try_flatten()
702 .map(move |batch| match batch {
703 Ok(batch) => {
704 output_row_metrics.add(batch.num_rows());
705 Ok(batch)
706 }
707 Err(e) => Err(e),
708 });
709
710 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
711 }
712
713 fn reserve_memory_for_merge(&mut self) -> Result<()> {
717 if self.runtime.disk_manager.tmp_files_enabled() {
719 let size = self.sort_spill_reservation_bytes;
720 if self.merge_reservation.size() != size {
721 self.merge_reservation
722 .try_resize(size)
723 .map_err(Self::err_with_oom_context)?;
724 }
725 }
726
727 Ok(())
728 }
729
730 async fn reserve_memory_for_batch_and_maybe_spill(
733 &mut self,
734 input: &RecordBatch,
735 ) -> Result<()> {
736 let size = get_reserved_bytes_for_record_batch(input)?;
737
738 match self.reservation.try_grow(size) {
739 Ok(_) => Ok(()),
740 Err(e) => {
741 if self.in_mem_batches.is_empty() {
742 return Err(Self::err_with_oom_context(e));
743 }
744
745 self.sort_and_spill_in_mem_batches().await?;
747 self.reservation
748 .try_grow(size)
749 .map_err(Self::err_with_oom_context)
750 }
751 }
752 }
753
754 fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
757 match e {
758 DataFusionError::ResourcesExhausted(_) => e.context(
759 "Not enough memory to continue external sort. \
760 Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', \
761 or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'."
762 ),
763 _ => e,
765 }
766 }
767}
768
769pub(crate) fn get_reserved_bytes_for_record_batch_size(
780 record_batch_size: usize,
781 sliced_size: usize,
782) -> usize {
783 record_batch_size + sliced_size
787}
788
789pub(crate) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result<usize> {
793 batch.get_sliced_size().map(|sliced_size| {
794 get_reserved_bytes_for_record_batch_size(
795 get_record_batch_memory_size(batch),
796 sliced_size,
797 )
798 })
799}
800
801impl Debug for ExternalSorter {
802 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
803 f.debug_struct("ExternalSorter")
804 .field("memory_used", &self.used())
805 .field("spilled_bytes", &self.spilled_bytes())
806 .field("spilled_rows", &self.spilled_rows())
807 .field("spill_count", &self.spill_count())
808 .finish()
809 }
810}
811
812pub fn sort_batch(
813 batch: &RecordBatch,
814 expressions: &LexOrdering,
815 fetch: Option<usize>,
816) -> Result<RecordBatch> {
817 let sort_columns = expressions
818 .iter()
819 .map(|expr| expr.evaluate_to_sort_column(batch))
820 .collect::<Result<Vec<_>>>()?;
821
822 let indices = lexsort_to_indices(&sort_columns, fetch)?;
823 let columns = take_arrays(batch.columns(), &indices, None)?;
824
825 let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
826 Ok(RecordBatch::try_new_with_options(
827 batch.schema(),
828 columns,
829 &options,
830 )?)
831}
832
833pub fn sort_batch_chunked(
837 batch: &RecordBatch,
838 expressions: &LexOrdering,
839 batch_size: usize,
840) -> Result<Vec<RecordBatch>> {
841 IncrementalSortIterator::new(batch.clone(), expressions.clone(), batch_size).collect()
842}
843
844#[derive(Debug, Clone)]
849pub struct SortExec {
850 pub(crate) input: Arc<dyn ExecutionPlan>,
852 expr: LexOrdering,
854 metrics_set: ExecutionPlanMetricsSet,
856 preserve_partitioning: bool,
859 fetch: Option<usize>,
861 common_sort_prefix: Vec<PhysicalSortExpr>,
863 cache: Arc<PlanProperties>,
865 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
869}
870
871impl SortExec {
872 pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
875 let preserve_partitioning = false;
876 let (cache, sort_prefix) =
877 Self::compute_properties(&input, expr.clone(), preserve_partitioning)
878 .unwrap();
879 Self {
880 expr,
881 input,
882 metrics_set: ExecutionPlanMetricsSet::new(),
883 preserve_partitioning,
884 fetch: None,
885 common_sort_prefix: sort_prefix,
886 cache: Arc::new(cache),
887 filter: None,
888 }
889 }
890
891 pub fn preserve_partitioning(&self) -> bool {
893 self.preserve_partitioning
894 }
895
896 pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
904 self.preserve_partitioning = preserve_partitioning;
905 Arc::make_mut(&mut self.cache).partitioning =
906 Self::output_partitioning_helper(&self.input, self.preserve_partitioning);
907 self
908 }
909
910 fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
912 let children = self
913 .expr
914 .iter()
915 .map(|sort_expr| Arc::clone(&sort_expr.expr))
916 .collect::<Vec<_>>();
917 Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
918 DynamicFilterPhysicalExpr::new(children, lit(true)),
919 ))))
920 }
921
922 fn cloned(&self) -> Self {
923 SortExec {
924 input: Arc::clone(&self.input),
925 expr: self.expr.clone(),
926 metrics_set: self.metrics_set.clone(),
927 preserve_partitioning: self.preserve_partitioning,
928 common_sort_prefix: self.common_sort_prefix.clone(),
929 fetch: self.fetch,
930 cache: Arc::clone(&self.cache),
931 filter: self.filter.clone(),
932 }
933 }
934
935 pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
943 let mut cache = PlanProperties::clone(&self.cache);
944 let is_pipeline_friendly = matches!(
948 cache.emission_type,
949 EmissionType::Incremental | EmissionType::Both
950 );
951 if fetch.is_some() && is_pipeline_friendly {
952 cache = cache.with_boundedness(Boundedness::Bounded);
953 }
954 let filter = fetch.is_some().then(|| {
955 self.filter.clone().unwrap_or_else(|| self.create_filter())
957 });
958 let mut new_sort = self.cloned();
959 new_sort.fetch = fetch;
960 new_sort.cache = cache.into();
961 new_sort.filter = filter;
962 new_sort
963 }
964
965 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
967 &self.input
968 }
969
970 pub fn expr(&self) -> &LexOrdering {
972 &self.expr
973 }
974
975 pub fn fetch(&self) -> Option<usize> {
977 self.fetch
978 }
979
980 pub fn dynamic_filter_expr(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
982 self.filter.as_ref().map(|f| f.read().expr())
983 }
984
985 pub fn with_dynamic_filter_expr(
993 mut self,
994 filter: Arc<DynamicFilterPhysicalExpr>,
995 ) -> Result<Self> {
996 let input_schema = self.input.schema();
997 for child in filter.children() {
998 child.data_type(&input_schema)?;
999 }
1000 self.filter = Some(Arc::new(RwLock::new(TopKDynamicFilters::new(filter))));
1001 Ok(self)
1002 }
1003
1004 fn output_partitioning_helper(
1005 input: &Arc<dyn ExecutionPlan>,
1006 preserve_partitioning: bool,
1007 ) -> Partitioning {
1008 if preserve_partitioning {
1010 input.output_partitioning().clone()
1011 } else {
1012 Partitioning::UnknownPartitioning(1)
1013 }
1014 }
1015
1016 fn compute_properties(
1019 input: &Arc<dyn ExecutionPlan>,
1020 sort_exprs: LexOrdering,
1021 preserve_partitioning: bool,
1022 ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1023 let (sort_prefix, sort_satisfied) = input
1024 .equivalence_properties()
1025 .extract_common_sort_prefix(sort_exprs.clone())?;
1026
1027 let emission_type = if sort_satisfied {
1031 input.pipeline_behavior()
1032 } else {
1033 EmissionType::Final
1034 };
1035
1036 let boundedness = if sort_satisfied {
1042 input.boundedness()
1043 } else {
1044 match input.boundedness() {
1045 Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1046 requires_infinite_memory: true,
1047 },
1048 bounded => bounded,
1049 }
1050 };
1051
1052 let mut eq_properties = input.equivalence_properties().clone();
1055 eq_properties.reorder(sort_exprs)?;
1056
1057 let output_partitioning =
1059 Self::output_partitioning_helper(input, preserve_partitioning);
1060
1061 Ok((
1062 PlanProperties::new(
1063 eq_properties,
1064 output_partitioning,
1065 emission_type,
1066 boundedness,
1067 ),
1068 sort_prefix,
1069 ))
1070 }
1071}
1072
1073impl DisplayAs for SortExec {
1074 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1075 match t {
1076 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1077 let preserve_partitioning = self.preserve_partitioning;
1078 match self.fetch {
1079 Some(fetch) => {
1080 write!(
1081 f,
1082 "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1083 self.expr
1084 )?;
1085 if let Some(filter) = &self.filter
1086 && let Ok(current) = filter.read().expr().current()
1087 && !current.eq(&lit(true))
1088 {
1089 write!(f, ", filter=[{current}]")?;
1090 }
1091 if !self.common_sort_prefix.is_empty() {
1092 write!(f, ", sort_prefix=[")?;
1093 let mut first = true;
1094 for sort_expr in &self.common_sort_prefix {
1095 if first {
1096 first = false;
1097 } else {
1098 write!(f, ", ")?;
1099 }
1100 write!(f, "{sort_expr}")?;
1101 }
1102 write!(f, "]")
1103 } else {
1104 Ok(())
1105 }
1106 }
1107 None => write!(
1108 f,
1109 "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1110 self.expr
1111 ),
1112 }
1113 }
1114 DisplayFormatType::TreeRender => match self.fetch {
1115 Some(fetch) => {
1116 writeln!(f, "{}", self.expr)?;
1117 writeln!(f, "limit={fetch}")
1118 }
1119 None => {
1120 writeln!(f, "{}", self.expr)
1121 }
1122 },
1123 }
1124 }
1125}
1126
1127impl ExecutionPlan for SortExec {
1128 fn name(&self) -> &'static str {
1129 match self.fetch {
1130 Some(_) => "SortExec(TopK)",
1131 None => "SortExec",
1132 }
1133 }
1134
1135 fn properties(&self) -> &Arc<PlanProperties> {
1136 &self.cache
1137 }
1138
1139 fn required_input_distribution(&self) -> Vec<Distribution> {
1140 if self.preserve_partitioning {
1141 vec![Distribution::UnspecifiedDistribution]
1142 } else {
1143 vec![Distribution::SinglePartition]
1146 }
1147 }
1148
1149 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1150 vec![&self.input]
1151 }
1152
1153 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1154 vec![false]
1155 }
1156
1157 fn with_new_children(
1158 self: Arc<Self>,
1159 children: Vec<Arc<dyn ExecutionPlan>>,
1160 ) -> Result<Arc<dyn ExecutionPlan>> {
1161 let mut new_sort = self.cloned();
1162 assert_eq!(children.len(), 1, "SortExec should have exactly one child");
1163 new_sort.input = Arc::clone(&children[0]);
1164
1165 if !has_same_children_properties(self.as_ref(), &children)? {
1166 let (cache, sort_prefix) = Self::compute_properties(
1168 &new_sort.input,
1169 new_sort.expr.clone(),
1170 new_sort.preserve_partitioning,
1171 )?;
1172 new_sort.cache = Arc::new(cache);
1173 new_sort.common_sort_prefix = sort_prefix;
1174 }
1175
1176 Ok(Arc::new(new_sort))
1177 }
1178
1179 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1180 let children = self.children().into_iter().cloned().collect();
1181 let new_sort = self.with_new_children(children)?;
1182 let mut new_sort = new_sort
1183 .downcast_ref::<SortExec>()
1184 .expect("cloned 1 lines above this line, we know the type")
1185 .clone();
1186 new_sort.filter = Some(new_sort.create_filter());
1188 new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1189
1190 Ok(Arc::new(new_sort))
1191 }
1192
1193 fn execute(
1194 &self,
1195 partition: usize,
1196 context: Arc<TaskContext>,
1197 ) -> Result<SendableRecordBatchStream> {
1198 trace!(
1199 "Start SortExec::execute for partition {} of context session_id {} and task_id {:?}",
1200 partition,
1201 context.session_id(),
1202 context.task_id()
1203 );
1204
1205 let mut input = self.input.execute(partition, Arc::clone(&context))?;
1206
1207 let execution_options = &context.session_config().options().execution;
1208
1209 trace!("End SortExec's input.execute for partition: {partition}");
1210
1211 let sort_satisfied = self
1212 .input
1213 .equivalence_properties()
1214 .ordering_satisfy(self.expr.clone())?;
1215
1216 match (sort_satisfied, self.fetch.as_ref()) {
1217 (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1218 input,
1219 0,
1220 Some(*fetch),
1221 BaselineMetrics::new(&self.metrics_set, partition),
1222 ))),
1223 (true, None) => Ok(input),
1224 (false, Some(fetch)) => {
1225 let filter = self.filter.clone();
1226 let mut topk = TopK::try_new(
1227 partition,
1228 input.schema(),
1229 self.common_sort_prefix.clone(),
1230 self.expr.clone(),
1231 *fetch,
1232 context.session_config().batch_size(),
1233 context.runtime_env(),
1234 &self.metrics_set,
1235 Arc::clone(&unwrap_or_internal_err!(filter)),
1236 )?;
1237 Ok(Box::pin(RecordBatchStreamAdapter::new(
1238 self.schema(),
1239 futures::stream::once(async move {
1240 while let Some(batch) = input.next().await {
1241 let batch = batch?;
1242 topk.insert_batch(batch)?;
1243 if topk.finished {
1244 break;
1245 }
1246 }
1247 drop(input);
1248 topk.emit()
1249 })
1250 .try_flatten(),
1251 )))
1252 }
1253 (false, None) => {
1254 let mut sorter = ExternalSorter::new(
1255 partition,
1256 input.schema(),
1257 self.expr.clone(),
1258 context.session_config().batch_size(),
1259 execution_options.sort_spill_reservation_bytes,
1260 execution_options.sort_in_place_threshold_bytes,
1261 context.session_config().spill_compression(),
1262 &self.metrics_set,
1263 context.runtime_env(),
1264 )?;
1265 Ok(Box::pin(RecordBatchStreamAdapter::new(
1266 self.schema(),
1267 futures::stream::once(async move {
1268 while let Some(batch) = input.next().await {
1269 let batch = batch?;
1270 sorter.insert_batch(batch).await?;
1271 }
1272 drop(input);
1273 sorter.sort().await
1274 })
1275 .try_flatten(),
1276 )))
1277 }
1278 }
1279 }
1280
1281 fn metrics(&self) -> Option<MetricsSet> {
1282 Some(self.metrics_set.clone_inner())
1283 }
1284
1285 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
1286 let p = if !self.preserve_partitioning() {
1287 None
1288 } else {
1289 partition
1290 };
1291 let stats = Arc::unwrap_or_clone(self.input.partition_statistics(p)?);
1292 Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
1293 }
1294
1295 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1296 Some(Arc::new(SortExec::with_fetch(self, limit)))
1297 }
1298
1299 fn fetch(&self) -> Option<usize> {
1300 self.fetch
1301 }
1302
1303 fn cardinality_effect(&self) -> CardinalityEffect {
1304 if self.fetch.is_none() {
1305 CardinalityEffect::Equal
1306 } else {
1307 CardinalityEffect::LowerEqual
1308 }
1309 }
1310
1311 fn try_swapping_with_projection(
1315 &self,
1316 projection: &ProjectionExec,
1317 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1318 if projection.expr().len() >= projection.input().schema().fields().len() {
1320 return Ok(None);
1321 }
1322
1323 let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1324 else {
1325 return Ok(None);
1326 };
1327
1328 Ok(Some(Arc::new(
1329 SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1330 .with_fetch(self.fetch())
1331 .with_preserve_partitioning(self.preserve_partitioning()),
1332 )))
1333 }
1334
1335 fn gather_filters_for_pushdown(
1336 &self,
1337 phase: FilterPushdownPhase,
1338 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1339 config: &datafusion_common::config::ConfigOptions,
1340 ) -> Result<FilterDescription> {
1341 if phase != FilterPushdownPhase::Post {
1342 if self.fetch.is_some() {
1343 return Ok(FilterDescription::all_unsupported(
1344 &parent_filters,
1345 &self.children(),
1346 ));
1347 }
1348 return FilterDescription::from_children(parent_filters, &self.children());
1349 }
1350
1351 let mut child = if self.fetch.is_some() {
1354 ChildFilterDescription::all_unsupported(&parent_filters)
1355 } else {
1356 ChildFilterDescription::from_child(&parent_filters, self.input())?
1357 };
1358
1359 if let Some(filter) = &self.filter
1360 && config.optimizer.enable_topk_dynamic_filter_pushdown
1361 {
1362 child = child.with_self_filter(filter.read().expr());
1363 }
1364
1365 Ok(FilterDescription::new().with_child(child))
1366 }
1367
1368 fn handle_child_pushdown_result(
1369 &self,
1370 _phase: FilterPushdownPhase,
1371 child_pushdown_result: ChildPushdownResult,
1372 _config: &datafusion_common::config::ConfigOptions,
1373 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1374 if self.fetch.is_some() {
1386 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
1387 }
1388
1389 let unsupported_filters: Vec<Arc<dyn PhysicalExpr>> = child_pushdown_result
1391 .parent_filters
1392 .iter()
1393 .filter(|&f| matches!(f.all(), PushedDown::No))
1394 .map(|f| Arc::clone(&f.filter))
1395 .collect();
1396
1397 if unsupported_filters.is_empty() {
1398 return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
1400 }
1401
1402 let predicate = datafusion_physical_expr::conjunction(unsupported_filters);
1405 let new_child =
1406 Arc::new(FilterExec::try_new(predicate, Arc::clone(self.input()))?)
1407 as Arc<dyn ExecutionPlan>;
1408 let new_sort = Arc::new(
1409 SortExec::new(self.expr.clone(), new_child)
1410 .with_fetch(self.fetch())
1411 .with_preserve_partitioning(self.preserve_partitioning()),
1412 ) as Arc<dyn ExecutionPlan>;
1413
1414 Ok(FilterPushdownPropagation {
1415 filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
1416 updated_node: Some(new_sort),
1417 })
1418 }
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423 use std::collections::HashMap;
1424 use std::pin::Pin;
1425 use std::task::{Context, Poll};
1426
1427 use super::*;
1428 use crate::coalesce_partitions::CoalescePartitionsExec;
1429 use crate::collect;
1430 use crate::empty::EmptyExec;
1431 use crate::execution_plan::Boundedness;
1432 use crate::expressions::col;
1433 use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
1434 use crate::test;
1435 use crate::test::TestMemoryExec;
1436 use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
1437 use crate::test::{assert_is_pending, make_partition};
1438
1439 use arrow::array::*;
1440 use arrow::compute::SortOptions;
1441 use arrow::datatypes::*;
1442 use datafusion_common::ScalarValue;
1443 use datafusion_common::cast::as_primitive_array;
1444 use datafusion_common::config::ConfigOptions;
1445 use datafusion_common::test_util::batches_to_string;
1446 use datafusion_execution::RecordBatchStream;
1447 use datafusion_execution::config::SessionConfig;
1448 use datafusion_execution::memory_pool::{
1449 GreedyMemoryPool, MemoryConsumer, MemoryPool,
1450 };
1451 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1452 use datafusion_physical_expr::EquivalenceProperties;
1453 use datafusion_physical_expr::expressions::{Column, Literal};
1454
1455 use futures::{FutureExt, Stream, TryStreamExt};
1456 use insta::assert_snapshot;
1457
1458 #[derive(Debug, Clone)]
1459 pub struct SortedUnboundedExec {
1460 schema: Schema,
1461 batch_size: u64,
1462 cache: Arc<PlanProperties>,
1463 }
1464
1465 impl DisplayAs for SortedUnboundedExec {
1466 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1467 match t {
1468 DisplayFormatType::Default
1469 | DisplayFormatType::Verbose
1470 | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1471 }
1472 Ok(())
1473 }
1474 }
1475
1476 impl SortedUnboundedExec {
1477 fn compute_properties(schema: SchemaRef) -> PlanProperties {
1478 let mut eq_properties = EquivalenceProperties::new(schema);
1479 eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1480 Column::new("c1", 0),
1481 ))]);
1482 PlanProperties::new(
1483 eq_properties,
1484 Partitioning::UnknownPartitioning(1),
1485 EmissionType::Final,
1486 Boundedness::Unbounded {
1487 requires_infinite_memory: false,
1488 },
1489 )
1490 }
1491 }
1492
1493 impl ExecutionPlan for SortedUnboundedExec {
1494 fn name(&self) -> &'static str {
1495 Self::static_name()
1496 }
1497
1498 fn properties(&self) -> &Arc<PlanProperties> {
1499 &self.cache
1500 }
1501
1502 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1503 vec![]
1504 }
1505
1506 fn with_new_children(
1507 self: Arc<Self>,
1508 _: Vec<Arc<dyn ExecutionPlan>>,
1509 ) -> Result<Arc<dyn ExecutionPlan>> {
1510 Ok(self)
1511 }
1512
1513 fn execute(
1514 &self,
1515 _partition: usize,
1516 _context: Arc<TaskContext>,
1517 ) -> Result<SendableRecordBatchStream> {
1518 Ok(Box::pin(SortedUnboundedStream {
1519 schema: Arc::new(self.schema.clone()),
1520 batch_size: self.batch_size,
1521 offset: 0,
1522 }))
1523 }
1524 }
1525
1526 #[derive(Debug)]
1527 pub struct SortedUnboundedStream {
1528 schema: SchemaRef,
1529 batch_size: u64,
1530 offset: u64,
1531 }
1532
1533 impl Stream for SortedUnboundedStream {
1534 type Item = Result<RecordBatch>;
1535
1536 fn poll_next(
1537 mut self: Pin<&mut Self>,
1538 _cx: &mut Context<'_>,
1539 ) -> Poll<Option<Self::Item>> {
1540 let batch = SortedUnboundedStream::create_record_batch(
1541 Arc::clone(&self.schema),
1542 self.offset,
1543 self.batch_size,
1544 );
1545 self.offset += self.batch_size;
1546 Poll::Ready(Some(Ok(batch)))
1547 }
1548 }
1549
1550 impl RecordBatchStream for SortedUnboundedStream {
1551 fn schema(&self) -> SchemaRef {
1552 Arc::clone(&self.schema)
1553 }
1554 }
1555
1556 impl SortedUnboundedStream {
1557 fn create_record_batch(
1558 schema: SchemaRef,
1559 offset: u64,
1560 batch_size: u64,
1561 ) -> RecordBatch {
1562 let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1563 let array = UInt64Array::from(values);
1564 let array_ref: ArrayRef = Arc::new(array);
1565 RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1566 }
1567 }
1568
1569 #[tokio::test]
1570 async fn test_in_mem_sort() -> Result<()> {
1571 let task_ctx = Arc::new(TaskContext::default());
1572 let partitions = 4;
1573 let csv = test::scan_partitioned(partitions);
1574 let schema = csv.schema();
1575
1576 let sort_exec = Arc::new(SortExec::new(
1577 [PhysicalSortExpr {
1578 expr: col("i", &schema)?,
1579 options: SortOptions::default(),
1580 }]
1581 .into(),
1582 Arc::new(CoalescePartitionsExec::new(csv)),
1583 ));
1584
1585 let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1586
1587 assert_eq!(result.len(), 1);
1588 assert_eq!(result[0].num_rows(), 400);
1589 assert_eq!(
1590 task_ctx.runtime_env().memory_pool.reserved(),
1591 0,
1592 "The sort should have returned all memory used back to the memory manager"
1593 );
1594
1595 Ok(())
1596 }
1597
1598 #[tokio::test]
1599 async fn test_sort_spill() -> Result<()> {
1600 let session_config = SessionConfig::new();
1602 let sort_spill_reservation_bytes = session_config
1603 .options()
1604 .execution
1605 .sort_spill_reservation_bytes;
1606 let runtime = RuntimeEnvBuilder::new()
1607 .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1608 .build_arc()?;
1609 let task_ctx = Arc::new(
1610 TaskContext::default()
1611 .with_session_config(session_config)
1612 .with_runtime(runtime),
1613 );
1614
1615 let partitions = 100;
1619 let input = test::scan_partitioned(partitions);
1620 let schema = input.schema();
1621
1622 let sort_exec = Arc::new(SortExec::new(
1623 [PhysicalSortExpr {
1624 expr: col("i", &schema)?,
1625 options: SortOptions::default(),
1626 }]
1627 .into(),
1628 Arc::new(CoalescePartitionsExec::new(input)),
1629 ));
1630
1631 let result = collect(
1632 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1633 Arc::clone(&task_ctx),
1634 )
1635 .await?;
1636
1637 assert_eq!(result.len(), 2);
1638
1639 let metrics = sort_exec.metrics().unwrap();
1641
1642 assert_eq!(metrics.output_rows().unwrap(), 10000);
1643 assert!(metrics.elapsed_compute().unwrap() > 0);
1644
1645 let spill_count = metrics.spill_count().unwrap();
1646 let spilled_rows = metrics.spilled_rows().unwrap();
1647 let spilled_bytes = metrics.spilled_bytes().unwrap();
1648 assert!((3..=10).contains(&spill_count));
1652 assert!((9000..=10000).contains(&spilled_rows));
1653 assert!((38000..=44000).contains(&spilled_bytes));
1654
1655 let columns = result[0].columns();
1656
1657 let i = as_primitive_array::<Int32Type>(&columns[0])?;
1658 assert_eq!(i.value(0), 0);
1659 assert_eq!(i.value(i.len() - 1), 81);
1660 assert_eq!(
1661 task_ctx.runtime_env().memory_pool.reserved(),
1662 0,
1663 "The sort should have returned all memory used back to the memory manager"
1664 );
1665
1666 Ok(())
1667 }
1668
1669 #[tokio::test]
1670 async fn test_batch_reservation_error() -> Result<()> {
1671 let merge_reservation: usize = 0; let session_config =
1675 SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1676
1677 let plan = test::scan_partitioned(1);
1678
1679 let expected_batch_reservation = {
1681 let temp_ctx = Arc::new(TaskContext::default());
1682 let mut stream = plan.execute(0, Arc::clone(&temp_ctx))?;
1683 let first_batch = stream.next().await.unwrap()?;
1684 get_reserved_bytes_for_record_batch(&first_batch)?
1685 };
1686
1687 let memory_limit: usize = expected_batch_reservation + merge_reservation - 1;
1689
1690 let runtime = RuntimeEnvBuilder::new()
1691 .with_memory_limit(memory_limit, 1.0)
1692 .build_arc()?;
1693 let task_ctx = Arc::new(
1694 TaskContext::default()
1695 .with_session_config(session_config)
1696 .with_runtime(runtime),
1697 );
1698
1699 {
1701 let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1702 let first_batch = stream.next().await.unwrap()?;
1703 let batch_reservation = get_reserved_bytes_for_record_batch(&first_batch)?;
1704
1705 assert_eq!(batch_reservation, expected_batch_reservation);
1706 assert!(memory_limit < (merge_reservation + batch_reservation));
1707 }
1708
1709 let sort_exec = Arc::new(SortExec::new(
1710 [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1711 plan,
1712 ));
1713
1714 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1715
1716 let err = result.unwrap_err();
1717 assert!(
1718 matches!(err, DataFusionError::Context(..)),
1719 "Assertion failed: expected a Context error, but got: {err:?}"
1720 );
1721
1722 assert!(
1724 matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1725 "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1726 );
1727
1728 let config_vector = vec![
1730 "datafusion.runtime.memory_limit",
1731 "datafusion.execution.sort_spill_reservation_bytes",
1732 ];
1733 let error_message = err.message().to_string();
1734 for config in config_vector.into_iter() {
1735 assert!(
1736 error_message.as_str().contains(config),
1737 "Config: '{}' should be contained in error message: {}.",
1738 config,
1739 error_message.as_str()
1740 );
1741 }
1742
1743 Ok(())
1744 }
1745
1746 #[tokio::test]
1747 async fn test_sort_spill_utf8_strings() -> Result<()> {
1748 let session_config = SessionConfig::new()
1749 .with_batch_size(100)
1750 .with_sort_in_place_threshold_bytes(20 * 1024)
1751 .with_sort_spill_reservation_bytes(100 * 1024);
1752 let runtime = RuntimeEnvBuilder::new()
1753 .with_memory_limit(500 * 1024, 1.0)
1754 .build_arc()?;
1755 let task_ctx = Arc::new(
1756 TaskContext::default()
1757 .with_session_config(session_config)
1758 .with_runtime(runtime),
1759 );
1760
1761 let input = test::scan_partitioned_utf8(200);
1765 let schema = input.schema();
1766
1767 let sort_exec = Arc::new(SortExec::new(
1768 [PhysicalSortExpr {
1769 expr: col("i", &schema)?,
1770 options: SortOptions::default(),
1771 }]
1772 .into(),
1773 Arc::new(CoalescePartitionsExec::new(input)),
1774 ));
1775
1776 let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1777
1778 let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1779 assert_eq!(num_rows, 20000);
1780
1781 let metrics = sort_exec.metrics().unwrap();
1783
1784 assert_eq!(metrics.output_rows().unwrap(), 20000);
1785 assert!(metrics.elapsed_compute().unwrap() > 0);
1786
1787 let spill_count = metrics.spill_count().unwrap();
1788 let spilled_rows = metrics.spilled_rows().unwrap();
1789 let spilled_bytes = metrics.spilled_bytes().unwrap();
1790
1791 assert!((4..=8).contains(&spill_count));
1805 assert!((15000..=20000).contains(&spilled_rows));
1806 assert!((900000..=1000000).contains(&spilled_bytes));
1807
1808 let concated_result = concat_batches(&schema, &result)?;
1810 let columns = concated_result.columns();
1811 let string_array = as_string_array(&columns[0]);
1812 for i in 0..string_array.len() - 1 {
1813 assert!(string_array.value(i) <= string_array.value(i + 1));
1814 }
1815
1816 assert_eq!(
1817 task_ctx.runtime_env().memory_pool.reserved(),
1818 0,
1819 "The sort should have returned all memory used back to the memory manager"
1820 );
1821
1822 Ok(())
1823 }
1824
1825 #[tokio::test]
1826 async fn test_sort_fetch_memory_calculation() -> Result<()> {
1827 let avg_batch_size = 400;
1829 let partitions = 4;
1830
1831 let test_options = vec![
1833 (None, true),
1836 (Some(1), false),
1839 ];
1840
1841 for (fetch, expect_spillage) in test_options {
1842 let session_config = SessionConfig::new();
1843 let sort_spill_reservation_bytes = session_config
1844 .options()
1845 .execution
1846 .sort_spill_reservation_bytes;
1847
1848 let runtime = RuntimeEnvBuilder::new()
1849 .with_memory_limit(
1850 sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1851 1.0,
1852 )
1853 .build_arc()?;
1854 let task_ctx = Arc::new(
1855 TaskContext::default()
1856 .with_runtime(runtime)
1857 .with_session_config(session_config),
1858 );
1859
1860 let csv = test::scan_partitioned(partitions);
1861 let schema = csv.schema();
1862
1863 let sort_exec = Arc::new(
1864 SortExec::new(
1865 [PhysicalSortExpr {
1866 expr: col("i", &schema)?,
1867 options: SortOptions::default(),
1868 }]
1869 .into(),
1870 Arc::new(CoalescePartitionsExec::new(csv)),
1871 )
1872 .with_fetch(fetch),
1873 );
1874
1875 let result =
1876 collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1877 assert_eq!(result.len(), 1);
1878
1879 let metrics = sort_exec.metrics().unwrap();
1880 let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1881 assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1882 }
1883 Ok(())
1884 }
1885
1886 #[tokio::test]
1887 async fn test_sort_memory_reduction_per_batch() -> Result<()> {
1888 let batch_size = 50; let num_rows = 1000; let task_ctx = Arc::new(
1897 TaskContext::default().with_session_config(
1898 SessionConfig::new()
1899 .with_batch_size(batch_size)
1900 .with_sort_in_place_threshold_bytes(usize::MAX), ),
1902 );
1903
1904 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1905
1906 let mut values: Vec<i32> = (0..num_rows).collect();
1908 values.reverse();
1909
1910 let input_batch = RecordBatch::try_new(
1911 Arc::clone(&schema),
1912 vec![Arc::new(Int32Array::from(values))],
1913 )?;
1914
1915 let batches = vec![input_batch];
1916
1917 let sort_exec = Arc::new(SortExec::new(
1918 [PhysicalSortExpr {
1919 expr: Arc::new(Column::new("a", 0)),
1920 options: SortOptions::default(),
1921 }]
1922 .into(),
1923 TestMemoryExec::try_new_exec(
1924 std::slice::from_ref(&batches),
1925 Arc::clone(&schema),
1926 None,
1927 )?,
1928 ));
1929
1930 let mut stream = sort_exec.execute(0, Arc::clone(&task_ctx))?;
1931
1932 let mut previous_reserved = task_ctx.runtime_env().memory_pool.reserved();
1933 let mut batch_count = 0;
1934
1935 while let Some(result) = stream.next().await {
1937 let batch = result?;
1938 batch_count += 1;
1939
1940 assert!(batch.num_rows() > 0, "Batch should not be empty");
1942
1943 let current_reserved = task_ctx.runtime_env().memory_pool.reserved();
1944
1945 if batch_count > 1 {
1948 assert!(
1949 current_reserved <= previous_reserved,
1950 "Memory reservation should decrease or stay same as batches are emitted. \
1951 Batch {batch_count}: previous={previous_reserved}, current={current_reserved}"
1952 );
1953 }
1954
1955 previous_reserved = current_reserved;
1956 }
1957
1958 assert!(
1959 batch_count > 1,
1960 "Expected multiple batches to be emitted, got {batch_count}"
1961 );
1962
1963 assert_eq!(
1965 task_ctx.runtime_env().memory_pool.reserved(),
1966 0,
1967 "All memory should be returned after consuming all batches"
1968 );
1969
1970 Ok(())
1971 }
1972
1973 #[tokio::test]
1974 async fn test_sort_metadata() -> Result<()> {
1975 let task_ctx = Arc::new(TaskContext::default());
1976 let field_metadata: HashMap<String, String> =
1977 vec![("foo".to_string(), "bar".to_string())]
1978 .into_iter()
1979 .collect();
1980 let schema_metadata: HashMap<String, String> =
1981 vec![("baz".to_string(), "barf".to_string())]
1982 .into_iter()
1983 .collect();
1984
1985 let mut field = Field::new("field_name", DataType::UInt64, true);
1986 field.set_metadata(field_metadata.clone());
1987 let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1988 let schema = Arc::new(schema);
1989
1990 let data: ArrayRef =
1991 Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1992
1993 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1994 let input =
1995 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1996
1997 let sort_exec = Arc::new(SortExec::new(
1998 [PhysicalSortExpr {
1999 expr: col("field_name", &schema)?,
2000 options: SortOptions::default(),
2001 }]
2002 .into(),
2003 input,
2004 ));
2005
2006 let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
2007
2008 let expected_data: ArrayRef =
2009 Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
2010 let expected_batch =
2011 RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
2012
2013 assert_eq!(&vec![expected_batch], &result);
2015
2016 assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
2018 assert_eq!(result[0].schema().metadata(), &schema_metadata);
2019
2020 Ok(())
2021 }
2022
2023 #[tokio::test]
2024 async fn test_lex_sort_by_mixed_types() -> Result<()> {
2025 let task_ctx = Arc::new(TaskContext::default());
2026 let schema = Arc::new(Schema::new(vec![
2027 Field::new("a", DataType::Int32, true),
2028 Field::new(
2029 "b",
2030 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2031 true,
2032 ),
2033 ]));
2034
2035 let batch = RecordBatch::try_new(
2037 Arc::clone(&schema),
2038 vec![
2039 Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
2040 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2041 Some(vec![Some(3)]),
2042 Some(vec![Some(1)]),
2043 Some(vec![Some(6), None]),
2044 Some(vec![Some(5)]),
2045 ])),
2046 ],
2047 )?;
2048
2049 let sort_exec = Arc::new(SortExec::new(
2050 [
2051 PhysicalSortExpr {
2052 expr: col("a", &schema)?,
2053 options: SortOptions {
2054 descending: false,
2055 nulls_first: true,
2056 },
2057 },
2058 PhysicalSortExpr {
2059 expr: col("b", &schema)?,
2060 options: SortOptions {
2061 descending: true,
2062 nulls_first: false,
2063 },
2064 },
2065 ]
2066 .into(),
2067 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
2068 ));
2069
2070 assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
2071 assert_eq!(
2072 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2073 *sort_exec.schema().field(1).data_type()
2074 );
2075
2076 let result: Vec<RecordBatch> =
2077 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2078 let metrics = sort_exec.metrics().unwrap();
2079 assert!(metrics.elapsed_compute().unwrap() > 0);
2080 assert_eq!(metrics.output_rows().unwrap(), 4);
2081 assert_eq!(result.len(), 1);
2082
2083 let expected = RecordBatch::try_new(
2084 schema,
2085 vec![
2086 Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
2087 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2088 Some(vec![Some(1)]),
2089 Some(vec![Some(6), None]),
2090 Some(vec![Some(5)]),
2091 Some(vec![Some(3)]),
2092 ])),
2093 ],
2094 )?;
2095
2096 assert_eq!(expected, result[0]);
2097
2098 Ok(())
2099 }
2100
2101 #[tokio::test]
2102 async fn test_lex_sort_by_float() -> Result<()> {
2103 let task_ctx = Arc::new(TaskContext::default());
2104 let schema = Arc::new(Schema::new(vec![
2105 Field::new("a", DataType::Float32, true),
2106 Field::new("b", DataType::Float64, true),
2107 ]));
2108
2109 let batch = RecordBatch::try_new(
2111 Arc::clone(&schema),
2112 vec![
2113 Arc::new(Float32Array::from(vec![
2114 Some(f32::NAN),
2115 None,
2116 None,
2117 Some(f32::NAN),
2118 Some(1.0_f32),
2119 Some(1.0_f32),
2120 Some(2.0_f32),
2121 Some(3.0_f32),
2122 ])),
2123 Arc::new(Float64Array::from(vec![
2124 Some(200.0_f64),
2125 Some(20.0_f64),
2126 Some(10.0_f64),
2127 Some(100.0_f64),
2128 Some(f64::NAN),
2129 None,
2130 None,
2131 Some(f64::NAN),
2132 ])),
2133 ],
2134 )?;
2135
2136 let sort_exec = Arc::new(SortExec::new(
2137 [
2138 PhysicalSortExpr {
2139 expr: col("a", &schema)?,
2140 options: SortOptions {
2141 descending: true,
2142 nulls_first: true,
2143 },
2144 },
2145 PhysicalSortExpr {
2146 expr: col("b", &schema)?,
2147 options: SortOptions {
2148 descending: false,
2149 nulls_first: false,
2150 },
2151 },
2152 ]
2153 .into(),
2154 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
2155 ));
2156
2157 assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
2158 assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
2159
2160 let result: Vec<RecordBatch> =
2161 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2162 let metrics = sort_exec.metrics().unwrap();
2163 assert!(metrics.elapsed_compute().unwrap() > 0);
2164 assert_eq!(metrics.output_rows().unwrap(), 8);
2165 assert_eq!(result.len(), 1);
2166
2167 let columns = result[0].columns();
2168
2169 assert_eq!(DataType::Float32, *columns[0].data_type());
2170 assert_eq!(DataType::Float64, *columns[1].data_type());
2171
2172 let a = as_primitive_array::<Float32Type>(&columns[0])?;
2173 let b = as_primitive_array::<Float64Type>(&columns[1])?;
2174
2175 let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2177 .map(|i| {
2178 let aval = if a.is_valid(i) {
2179 Some(a.value(i).to_string())
2180 } else {
2181 None
2182 };
2183 let bval = if b.is_valid(i) {
2184 Some(b.value(i).to_string())
2185 } else {
2186 None
2187 };
2188 (aval, bval)
2189 })
2190 .collect();
2191
2192 let expected: Vec<(Option<String>, Option<String>)> = vec![
2193 (None, Some("10".to_owned())),
2194 (None, Some("20".to_owned())),
2195 (Some("NaN".to_owned()), Some("100".to_owned())),
2196 (Some("NaN".to_owned()), Some("200".to_owned())),
2197 (Some("3".to_owned()), Some("NaN".to_owned())),
2198 (Some("2".to_owned()), None),
2199 (Some("1".to_owned()), Some("NaN".to_owned())),
2200 (Some("1".to_owned()), None),
2201 ];
2202
2203 assert_eq!(expected, result);
2204
2205 Ok(())
2206 }
2207
2208 #[tokio::test]
2209 async fn test_drop_cancel() -> Result<()> {
2210 let task_ctx = Arc::new(TaskContext::default());
2211 let schema =
2212 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2213
2214 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2215 let refs = blocking_exec.refs();
2216 let sort_exec = Arc::new(SortExec::new(
2217 [PhysicalSortExpr {
2218 expr: col("a", &schema)?,
2219 options: SortOptions::default(),
2220 }]
2221 .into(),
2222 blocking_exec,
2223 ));
2224
2225 let fut = collect(sort_exec, Arc::clone(&task_ctx));
2226 let mut fut = fut.boxed();
2227
2228 assert_is_pending(&mut fut);
2229 drop(fut);
2230 assert_strong_count_converges_to_zero(refs).await;
2231
2232 assert_eq!(
2233 task_ctx.runtime_env().memory_pool.reserved(),
2234 0,
2235 "The sort should have returned all memory used back to the memory manager"
2236 );
2237
2238 Ok(())
2239 }
2240
2241 #[test]
2242 fn test_empty_sort_batch() {
2243 let schema = Arc::new(Schema::empty());
2244 let options = RecordBatchOptions::new().with_row_count(Some(1));
2245 let batch =
2246 RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2247 .unwrap();
2248
2249 let expressions = [PhysicalSortExpr {
2250 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2251 options: SortOptions::default(),
2252 }]
2253 .into();
2254
2255 let result = sort_batch(&batch, &expressions, None).unwrap();
2256 assert_eq!(result.num_rows(), 1);
2257 }
2258
2259 #[tokio::test]
2260 async fn topk_unbounded_source() -> Result<()> {
2261 let task_ctx = Arc::new(TaskContext::default());
2262 let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2263 let source = SortedUnboundedExec {
2264 schema: schema.clone(),
2265 batch_size: 2,
2266 cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new(
2267 schema.clone(),
2268 ))),
2269 };
2270 let mut plan = SortExec::new(
2271 [PhysicalSortExpr::new_default(Arc::new(Column::new(
2272 "c1", 0,
2273 )))]
2274 .into(),
2275 Arc::new(source),
2276 );
2277 plan = plan.with_fetch(Some(9));
2278
2279 let batches = collect(Arc::new(plan), task_ctx).await?;
2280 assert_snapshot!(batches_to_string(&batches), @r"
2281 +----+
2282 | c1 |
2283 +----+
2284 | 0 |
2285 | 1 |
2286 | 2 |
2287 | 3 |
2288 | 4 |
2289 | 5 |
2290 | 6 |
2291 | 7 |
2292 | 8 |
2293 +----+
2294 ");
2295 Ok(())
2296 }
2297
2298 #[tokio::test]
2299 async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2300 let batch_size = 100;
2301
2302 let create_task_ctx = |_: &[RecordBatch]| {
2303 TaskContext::default().with_session_config(
2304 SessionConfig::new()
2305 .with_batch_size(batch_size)
2306 .with_sort_in_place_threshold_bytes(usize::MAX),
2307 )
2308 };
2309
2310 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2312
2313 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2315
2316 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2318
2319 Ok(())
2320 }
2321
2322 #[tokio::test]
2323 async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place()
2324 -> Result<()> {
2325 let batch_size = 100;
2326
2327 let create_task_ctx = |_: &[RecordBatch]| {
2328 TaskContext::default().with_session_config(
2329 SessionConfig::new()
2330 .with_batch_size(batch_size)
2331 .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2332 )
2333 };
2334
2335 {
2337 let metrics =
2338 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2339
2340 assert_eq!(
2341 metrics.spill_count(),
2342 Some(0),
2343 "Expected no spills when sorting in place"
2344 );
2345 }
2346
2347 {
2349 let metrics =
2350 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2351
2352 assert_eq!(
2353 metrics.spill_count(),
2354 Some(0),
2355 "Expected no spills when sorting in place"
2356 );
2357 }
2358
2359 {
2361 let metrics =
2362 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2363
2364 assert_eq!(
2365 metrics.spill_count(),
2366 Some(0),
2367 "Expected no spills when sorting in place"
2368 );
2369 }
2370
2371 Ok(())
2372 }
2373
2374 #[tokio::test]
2375 async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch()
2376 -> Result<()> {
2377 let batch_size = 100;
2378
2379 let create_task_ctx = |_: &[RecordBatch]| {
2380 TaskContext::default()
2381 .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2382 };
2383
2384 {
2386 let metrics = test_sort_output_batch_size(
2387 1,
2389 batch_size / 4,
2390 create_task_ctx,
2391 )
2392 .await?;
2393
2394 assert_eq!(
2395 metrics.spill_count(),
2396 Some(0),
2397 "Expected no spills when sorting in place"
2398 );
2399 }
2400
2401 {
2403 let metrics = test_sort_output_batch_size(
2404 1,
2406 batch_size + 7,
2407 create_task_ctx,
2408 )
2409 .await?;
2410
2411 assert_eq!(
2412 metrics.spill_count(),
2413 Some(0),
2414 "Expected no spills when sorting in place"
2415 );
2416 }
2417
2418 {
2420 let metrics = test_sort_output_batch_size(
2421 1,
2423 batch_size * 3,
2424 create_task_ctx,
2425 )
2426 .await?;
2427
2428 assert_eq!(
2429 metrics.spill_count(),
2430 Some(0),
2431 "Expected no spills when sorting in place"
2432 );
2433 }
2434
2435 Ok(())
2436 }
2437
2438 #[tokio::test]
2439 async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill()
2440 -> Result<()> {
2441 let batch_size = 100;
2442
2443 let create_task_ctx = |generated_batches: &[RecordBatch]| {
2444 let batches_memory = generated_batches
2445 .iter()
2446 .map(|b| b.get_array_memory_size())
2447 .sum::<usize>();
2448
2449 TaskContext::default()
2450 .with_session_config(
2451 SessionConfig::new()
2452 .with_batch_size(batch_size)
2453 .with_sort_in_place_threshold_bytes(1)
2455 .with_sort_spill_reservation_bytes(1),
2456 )
2457 .with_runtime(
2458 RuntimeEnvBuilder::default()
2459 .with_memory_limit(batches_memory, 1.0)
2460 .build_arc()
2461 .unwrap(),
2462 )
2463 };
2464
2465 {
2467 let metrics =
2468 test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2469
2470 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2471 }
2472
2473 {
2475 let metrics =
2476 test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2477
2478 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2479 }
2480
2481 {
2483 let metrics =
2484 test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2485
2486 assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2487 }
2488
2489 Ok(())
2490 }
2491
2492 async fn test_sort_output_batch_size(
2493 number_of_batches: usize,
2494 batch_size_to_generate: usize,
2495 create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2496 ) -> Result<MetricsSet> {
2497 let batches = (0..number_of_batches)
2498 .map(|_| make_partition(batch_size_to_generate as i32))
2499 .collect::<Vec<_>>();
2500 let task_ctx = create_task_ctx(batches.as_slice());
2501
2502 let expected_batch_size = task_ctx.session_config().batch_size();
2503
2504 let (mut output_batches, metrics) =
2505 run_sort_on_input(task_ctx, "i", batches).await?;
2506
2507 let last_batch = output_batches.pop().unwrap();
2508
2509 for batch in output_batches {
2510 assert_eq!(batch.num_rows(), expected_batch_size);
2511 }
2512
2513 let mut last_expected_batch_size =
2514 (batch_size_to_generate * number_of_batches) % expected_batch_size;
2515 if last_expected_batch_size == 0 {
2516 last_expected_batch_size = expected_batch_size;
2517 }
2518 assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2519
2520 Ok(metrics)
2521 }
2522
2523 async fn run_sort_on_input(
2524 task_ctx: TaskContext,
2525 order_by_col: &str,
2526 batches: Vec<RecordBatch>,
2527 ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2528 let task_ctx = Arc::new(task_ctx);
2529
2530 let schema = batches[0].schema();
2532 let ordering: LexOrdering = [PhysicalSortExpr {
2533 expr: col(order_by_col, &schema)?,
2534 options: SortOptions {
2535 descending: false,
2536 nulls_first: true,
2537 },
2538 }]
2539 .into();
2540 let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2541 ordering.clone(),
2542 TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2543 ));
2544
2545 let sorted_batches =
2546 collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2547
2548 let metrics = sort_exec.metrics().expect("sort have metrics");
2549
2550 {
2552 let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2553 let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2554
2555 let sorted_batches_concat =
2556 concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2557
2558 assert_eq!(sorted_input_batch, sorted_batches_concat);
2559 }
2560
2561 Ok((sorted_batches, metrics))
2562 }
2563
2564 #[tokio::test]
2565 async fn test_sort_batch_chunked_basic() -> Result<()> {
2566 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2567
2568 let mut values: Vec<i32> = (0..1000).collect();
2570 values.reverse();
2572
2573 let batch = RecordBatch::try_new(
2574 Arc::clone(&schema),
2575 vec![Arc::new(Int32Array::from(values))],
2576 )?;
2577
2578 let expressions: LexOrdering =
2579 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2580
2581 let result_batches = sort_batch_chunked(&batch, &expressions, 250)?;
2583
2584 assert_eq!(result_batches.len(), 4);
2586
2587 let mut total_rows = 0;
2589 for (i, batch) in result_batches.iter().enumerate() {
2590 assert!(
2591 batch.num_rows() <= 250,
2592 "Batch {} has {} rows, expected <= 250",
2593 i,
2594 batch.num_rows()
2595 );
2596 total_rows += batch.num_rows();
2597 }
2598
2599 assert_eq!(total_rows, 1000);
2601
2602 let concatenated = concat_batches(&schema, &result_batches)?;
2604 let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2605 for i in 0..array.len() - 1 {
2606 assert!(
2607 array.value(i) <= array.value(i + 1),
2608 "Array not sorted at position {}: {} > {}",
2609 i,
2610 array.value(i),
2611 array.value(i + 1)
2612 );
2613 }
2614 assert_eq!(array.value(0), 0);
2615 assert_eq!(array.value(array.len() - 1), 999);
2616
2617 Ok(())
2618 }
2619
2620 #[tokio::test]
2621 async fn test_sort_batch_chunked_smaller_than_batch_size() -> Result<()> {
2622 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2623
2624 let values: Vec<i32> = (0..50).rev().collect();
2626 let batch = RecordBatch::try_new(
2627 Arc::clone(&schema),
2628 vec![Arc::new(Int32Array::from(values))],
2629 )?;
2630
2631 let expressions: LexOrdering =
2632 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2633
2634 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2636
2637 assert_eq!(result_batches.len(), 1);
2639 assert_eq!(result_batches[0].num_rows(), 50);
2640
2641 let array = as_primitive_array::<Int32Type>(result_batches[0].column(0))?;
2643 for i in 0..array.len() - 1 {
2644 assert!(array.value(i) <= array.value(i + 1));
2645 }
2646 assert_eq!(array.value(0), 0);
2647 assert_eq!(array.value(49), 49);
2648
2649 Ok(())
2650 }
2651
2652 #[tokio::test]
2653 async fn test_sort_batch_chunked_exact_multiple() -> Result<()> {
2654 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2655
2656 let values: Vec<i32> = (0..1000).rev().collect();
2658 let batch = RecordBatch::try_new(
2659 Arc::clone(&schema),
2660 vec![Arc::new(Int32Array::from(values))],
2661 )?;
2662
2663 let expressions: LexOrdering =
2664 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2665
2666 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2668
2669 assert_eq!(result_batches.len(), 10);
2671 for batch in &result_batches {
2672 assert_eq!(batch.num_rows(), 100);
2673 }
2674
2675 let concatenated = concat_batches(&schema, &result_batches)?;
2677 let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2678 for i in 0..array.len() - 1 {
2679 assert!(array.value(i) <= array.value(i + 1));
2680 }
2681
2682 Ok(())
2683 }
2684
2685 #[tokio::test]
2686 async fn test_sort_batch_chunked_empty_batch() -> Result<()> {
2687 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2688
2689 let batch = RecordBatch::new_empty(Arc::clone(&schema));
2690
2691 let expressions: LexOrdering =
2692 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2693
2694 let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2695
2696 assert_eq!(result_batches.len(), 0);
2698
2699 Ok(())
2700 }
2701
2702 #[tokio::test]
2703 async fn test_get_reserved_bytes_for_record_batch_with_sliced_batches() -> Result<()>
2704 {
2705 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2706
2707 let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
2709 let sliced_array = large_array.slice(100, 50); let sliced_batch =
2712 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(sliced_array)])?;
2713 let batch =
2714 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(large_array)])?;
2715
2716 let sliced_reserved = get_reserved_bytes_for_record_batch(&sliced_batch)?;
2717 let reserved = get_reserved_bytes_for_record_batch(&batch)?;
2718
2719 assert!(reserved > sliced_reserved);
2721
2722 Ok(())
2723 }
2724
2725 #[test]
2726 fn test_with_dynamic_filter() -> Result<()> {
2727 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2728 let child = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2729
2730 let sort = SortExec::new(
2731 LexOrdering::new(vec![PhysicalSortExpr {
2732 expr: Arc::new(Column::new("a", 0)),
2733 options: SortOptions::default(),
2734 }])
2735 .unwrap(),
2736 child,
2737 )
2738 .with_fetch(Some(10));
2739
2740 let original_id = sort
2742 .dynamic_filter_expr()
2743 .expect("should have dynamic filter with fetch")
2744 .expression_id()
2745 .expect("DynamicFilterPhysicalExpr always has an expression_id");
2746
2747 let new_df = Arc::new(DynamicFilterPhysicalExpr::new(
2749 vec![Arc::new(Column::new("a", 0)) as _],
2750 lit(true),
2751 ));
2752 let new_id = new_df
2753 .expression_id()
2754 .expect("DynamicFilterPhysicalExpr always has an expression_id");
2755 let sort = sort.with_dynamic_filter_expr(Arc::clone(&new_df))?;
2756 let restored_id = sort
2757 .dynamic_filter_expr()
2758 .expect("should still have dynamic filter")
2759 .expression_id()
2760 .expect("DynamicFilterPhysicalExpr always has an expression_id");
2761 assert_eq!(restored_id, new_id);
2762 assert_ne!(restored_id, original_id);
2763 Ok(())
2764 }
2765
2766 #[test]
2767 fn test_with_dynamic_filter_rejects_invalid_columns() -> Result<()> {
2768 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2769 let child = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2770
2771 let sort = SortExec::new(
2772 LexOrdering::new(vec![PhysicalSortExpr {
2773 expr: Arc::new(Column::new("a", 0)),
2774 options: SortOptions::default(),
2775 }])
2776 .unwrap(),
2777 child,
2778 )
2779 .with_fetch(Some(10));
2780
2781 let df = Arc::new(DynamicFilterPhysicalExpr::new(
2783 vec![Arc::new(Column::new("bad", 99)) as _],
2784 lit(true),
2785 ));
2786 assert!(sort.with_dynamic_filter_expr(df).is_err());
2787 Ok(())
2788 }
2789
2790 #[tokio::test]
2807 async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> {
2808 let sort_spill_reservation_bytes: usize = 10 * 1024; let sort_working_memory: usize = 40 * 1024; let pool_size = sort_spill_reservation_bytes + sort_working_memory;
2814 let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(pool_size));
2815
2816 let runtime = RuntimeEnvBuilder::new()
2817 .with_memory_pool(Arc::clone(&pool))
2818 .build_arc()?;
2819
2820 let metrics_set = ExecutionPlanMetricsSet::new();
2821 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
2822
2823 let mut sorter = ExternalSorter::new(
2824 0,
2825 Arc::clone(&schema),
2826 [PhysicalSortExpr::new_default(Arc::new(Column::new("x", 0)))].into(),
2827 128, sort_spill_reservation_bytes,
2829 usize::MAX, SpillCompression::Uncompressed,
2831 &metrics_set,
2832 Arc::clone(&runtime),
2833 )?;
2834
2835 let num_batches = 200;
2837 for i in 0..num_batches {
2838 let values: Vec<i32> = ((i * 100)..((i + 1) * 100)).rev().collect();
2839 let batch = RecordBatch::try_new(
2840 Arc::clone(&schema),
2841 vec![Arc::new(Int32Array::from(values))],
2842 )?;
2843 sorter.insert_batch(batch).await?;
2844 }
2845
2846 assert!(
2847 sorter.spilled_before(),
2848 "Test requires spilling to exercise the merge path"
2849 );
2850
2851 assert!(
2853 sorter.merge_reservation_size() >= sort_spill_reservation_bytes,
2854 "merge_reservation should hold the pre-reserved bytes before sort()"
2855 );
2856
2857 let merge_stream = sorter.sort().await?;
2863
2864 assert_eq!(
2869 sorter.merge_reservation_size(),
2870 0,
2871 "After sort(), merge_reservation should be 0 (bytes transferred \
2872 to merge stream via take()). If non-zero, the bytes are still \
2873 held by the sorter and will be freed on drop, allowing other \
2874 partitions to steal them."
2875 );
2876
2877 drop(sorter);
2879
2880 let contender = MemoryConsumer::new("CompetingPartition").register(&pool);
2885 let available = pool_size.saturating_sub(pool.reserved());
2886 if available > 0 {
2887 contender.try_grow(available).unwrap();
2888 }
2889
2890 let batches: Vec<RecordBatch> = merge_stream.try_collect().await?;
2895 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2896 assert_eq!(
2897 total_rows,
2898 (num_batches * 100) as usize,
2899 "Merge stream should produce all rows even under memory contention"
2900 );
2901
2902 let merged = concat_batches(&schema, &batches)?;
2904 let col = merged.column(0).as_primitive::<Int32Type>();
2905 for i in 1..col.len() {
2906 assert!(
2907 col.value(i - 1) <= col.value(i),
2908 "Output should be sorted, but found {} > {} at index {}",
2909 col.value(i - 1),
2910 col.value(i),
2911 i
2912 );
2913 }
2914
2915 drop(contender);
2916 Ok(())
2917 }
2918
2919 fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
2920 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2921 let input = Arc::new(EmptyExec::new(schema));
2922 SortExec::new(
2923 [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2924 input,
2925 )
2926 .with_fetch(fetch)
2927 }
2928
2929 #[test]
2930 fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
2931 let sort = make_sort_exec_with_fetch(Some(10));
2932 let desc = sort.gather_filters_for_pushdown(
2933 FilterPushdownPhase::Pre,
2934 vec![Arc::new(Column::new("a", 0))],
2935 &ConfigOptions::new(),
2936 )?;
2937 assert!(matches!(
2939 desc.parent_filters()[0][0].discriminant,
2940 PushedDown::No
2941 ));
2942 Ok(())
2943 }
2944
2945 #[test]
2946 fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
2947 let sort = make_sort_exec_with_fetch(None);
2948 let desc = sort.gather_filters_for_pushdown(
2949 FilterPushdownPhase::Pre,
2950 vec![Arc::new(Column::new("a", 0))],
2951 &ConfigOptions::new(),
2952 )?;
2953 assert!(matches!(
2955 desc.parent_filters()[0][0].discriminant,
2956 PushedDown::Yes
2957 ));
2958 Ok(())
2959 }
2960
2961 #[test]
2962 fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
2963 let sort = make_sort_exec_with_fetch(Some(10));
2964 assert!(sort.filter.is_some(), "TopK filter should be created");
2965
2966 let mut config = ConfigOptions::new();
2967 config.optimizer.enable_topk_dynamic_filter_pushdown = true;
2968 let desc = sort.gather_filters_for_pushdown(
2969 FilterPushdownPhase::Post,
2970 vec![Arc::new(Column::new("a", 0))],
2971 &config,
2972 )?;
2973 assert!(matches!(
2975 desc.parent_filters()[0][0].discriminant,
2976 PushedDown::No
2977 ));
2978 assert_eq!(desc.self_filters()[0].len(), 1);
2980 Ok(())
2981 }
2982}