use std::any::Any;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use parking_lot::RwLock;
use crate::common::spawn_buffered;
use crate::execution_plan::{
Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
};
use crate::expressions::PhysicalSortExpr;
use crate::filter_pushdown::{
ChildFilterDescription, FilterDescription, FilterPushdownPhase,
};
use crate::limit::LimitStream;
use crate::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
};
use crate::projection::{ProjectionExec, make_with_child, update_ordering};
use crate::sorts::IncrementalSortIterator;
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
use crate::spill::get_record_batch_memory_size;
use crate::spill::in_progress_spill_file::InProgressSpillFile;
use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
use crate::stream::RecordBatchStreamAdapter;
use crate::stream::ReservationStream;
use crate::topk::TopK;
use crate::topk::TopKDynamicFilters;
use crate::{
DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
Statistics,
};
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
use arrow::datatypes::SchemaRef;
use datafusion_common::config::SpillCompression;
use datafusion_common::{
DataFusionError, Result, assert_or_internal_err, internal_datafusion_err,
unwrap_or_internal_err,
};
use datafusion_execution::TaskContext;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
use futures::{StreamExt, TryStreamExt};
use log::{debug, trace};
struct ExternalSorterMetrics {
baseline: BaselineMetrics,
spill_metrics: SpillMetrics,
}
impl ExternalSorterMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
baseline: BaselineMetrics::new(metrics, partition),
spill_metrics: SpillMetrics::new(metrics, partition),
}
}
}
struct ExternalSorter {
schema: SchemaRef,
expr: LexOrdering,
batch_size: usize,
sort_in_place_threshold_bytes: usize,
in_mem_batches: Vec<RecordBatch>,
in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
finished_spill_files: Vec<SortedSpillFile>,
metrics: ExternalSorterMetrics,
runtime: Arc<RuntimeEnv>,
reservation: MemoryReservation,
spill_manager: SpillManager,
merge_reservation: MemoryReservation,
sort_spill_reservation_bytes: usize,
}
impl ExternalSorter {
#[expect(clippy::too_many_arguments)]
pub fn new(
partition_id: usize,
schema: SchemaRef,
expr: LexOrdering,
batch_size: usize,
sort_spill_reservation_bytes: usize,
sort_in_place_threshold_bytes: usize,
spill_compression: SpillCompression,
metrics: &ExecutionPlanMetricsSet,
runtime: Arc<RuntimeEnv>,
) -> Result<Self> {
let metrics = ExternalSorterMetrics::new(metrics, partition_id);
let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
.with_can_spill(true)
.register(&runtime.memory_pool);
let merge_reservation =
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
.register(&runtime.memory_pool);
let spill_manager = SpillManager::new(
Arc::clone(&runtime),
metrics.spill_metrics.clone(),
Arc::clone(&schema),
)
.with_compression_type(spill_compression);
Ok(Self {
schema,
in_mem_batches: vec![],
in_progress_spill_file: None,
finished_spill_files: vec![],
expr,
metrics,
reservation,
spill_manager,
merge_reservation,
runtime,
batch_size,
sort_spill_reservation_bytes,
sort_in_place_threshold_bytes,
})
}
async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
if input.num_rows() == 0 {
return Ok(());
}
self.reserve_memory_for_merge()?;
self.reserve_memory_for_batch_and_maybe_spill(&input)
.await?;
self.in_mem_batches.push(input);
Ok(())
}
fn spilled_before(&self) -> bool {
!self.finished_spill_files.is_empty()
}
async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
self.merge_reservation.free();
if self.spilled_before() {
if !self.in_mem_batches.is_empty() {
self.sort_and_spill_in_mem_batches().await?;
}
StreamingMergeBuilder::new()
.with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
.with_spill_manager(self.spill_manager.clone())
.with_schema(Arc::clone(&self.schema))
.with_expressions(&self.expr.clone())
.with_metrics(self.metrics.baseline.clone())
.with_batch_size(self.batch_size)
.with_fetch(None)
.with_reservation(self.merge_reservation.new_empty())
.build()
} else {
self.in_mem_sort_stream(self.metrics.baseline.clone())
}
}
fn used(&self) -> usize {
self.reservation.size()
}
fn spilled_bytes(&self) -> usize {
self.metrics.spill_metrics.spilled_bytes.value()
}
fn spilled_rows(&self) -> usize {
self.metrics.spill_metrics.spilled_rows.value()
}
fn spill_count(&self) -> usize {
self.metrics.spill_metrics.spill_file_count.value()
}
async fn consume_and_spill_append(
&mut self,
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()> {
if globally_sorted_batches.is_empty() {
return Ok(());
}
if self.in_progress_spill_file.is_none() {
self.in_progress_spill_file =
Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
}
Self::organize_stringview_arrays(globally_sorted_batches)?;
debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
let (in_progress_file, max_record_batch_size) =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
internal_datafusion_err!("In-progress spill file should be initialized")
})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
*max_record_batch_size =
(*max_record_batch_size).max(batch.get_sliced_size()?);
}
assert_or_internal_err!(
globally_sorted_batches.is_empty(),
"This function consumes globally_sorted_batches, so it should be empty after taking."
);
Ok(())
}
async fn spill_finish(&mut self) -> Result<()> {
let (mut in_progress_file, max_record_batch_memory) =
self.in_progress_spill_file.take().ok_or_else(|| {
internal_datafusion_err!("Should be called after `spill_append`")
})?;
let spill_file = in_progress_file.finish()?;
if let Some(spill_file) = spill_file {
self.finished_spill_files.push(SortedSpillFile {
file: spill_file,
max_record_batch_memory,
});
}
Ok(())
}
fn organize_stringview_arrays(
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()> {
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
for batch in globally_sorted_batches.drain(..) {
let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());
let mut arr_mutated = false;
for array in batch.columns() {
if let Some(string_view_array) =
array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
new_columns.push(Arc::new(new_array));
arr_mutated = true;
} else {
new_columns.push(Arc::clone(array));
}
}
let organized_batch = if arr_mutated {
RecordBatch::try_new(batch.schema(), new_columns)?
} else {
batch
};
organized_batches.push(organized_batch);
}
*globally_sorted_batches = organized_batches;
Ok(())
}
async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
assert_or_internal_err!(
!self.in_mem_batches.is_empty(),
"in_mem_batches must not be empty when attempting to sort and spill"
);
self.merge_reservation.free();
let mut sorted_stream =
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
assert_or_internal_err!(
self.in_mem_batches.is_empty(),
"in_mem_batches should be empty after constructing sorted stream"
);
let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
if self.reservation.try_grow(sorted_size).is_err() {
globally_sorted_batches.push(batch);
self.consume_and_spill_append(&mut globally_sorted_batches)
.await?; } else {
globally_sorted_batches.push(batch);
}
}
drop(sorted_stream);
self.consume_and_spill_append(&mut globally_sorted_batches)
.await?;
self.spill_finish().await?;
let buffers_cleared_property =
self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
assert_or_internal_err!(
buffers_cleared_property,
"in_mem_batches and globally_sorted_batches should be cleared before"
);
self.reserve_memory_for_merge()?;
Ok(())
}
fn in_mem_sort_stream(
&mut self,
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream> {
if self.in_mem_batches.is_empty() {
return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
&self.schema,
))));
}
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
if self.in_mem_batches.len() == 1 {
let batch = self.in_mem_batches.swap_remove(0);
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, &metrics, reservation);
}
if self.reservation.size() < self.sort_in_place_threshold_bytes {
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation
.try_resize(get_reserved_bytes_for_record_batch(&batch)?)
.map_err(Self::err_with_oom_context)?;
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, &metrics, reservation);
}
let streams = std::mem::take(&mut self.in_mem_batches)
.into_iter()
.map(|batch| {
let metrics = self.metrics.baseline.intermediate();
let reservation = self
.reservation
.split(get_reserved_bytes_for_record_batch(&batch)?);
let input = self.sort_batch_stream(batch, &metrics, reservation)?;
Ok(spawn_buffered(input, 1))
})
.collect::<Result<_>>()?;
StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(Arc::clone(&self.schema))
.with_expressions(&self.expr.clone())
.with_metrics(metrics)
.with_batch_size(self.batch_size)
.with_fetch(None)
.with_reservation(self.merge_reservation.new_empty())
.build()
}
fn sort_batch_stream(
&self,
batch: RecordBatch,
metrics: &BaselineMetrics,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
assert_eq!(
get_reserved_bytes_for_record_batch(&batch)?,
reservation.size()
);
let schema = batch.schema();
let expressions = self.expr.clone();
let batch_size = self.batch_size;
let output_row_metrics = metrics.output_rows().clone();
let stream = futures::stream::once(async move {
let schema = batch.schema();
let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
let total_sorted_size: usize = sorted_batches
.iter()
.map(get_record_batch_memory_size)
.sum();
reservation
.try_resize(total_sorted_size)
.map_err(Self::err_with_oom_context)?;
Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
Arc::clone(&schema),
Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
)),
reservation,
)) as SendableRecordBatchStream)
})
.try_flatten()
.map(move |batch| match batch {
Ok(batch) => {
output_row_metrics.add(batch.num_rows());
Ok(batch)
}
Err(e) => Err(e),
});
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
fn reserve_memory_for_merge(&mut self) -> Result<()> {
if self.runtime.disk_manager.tmp_files_enabled() {
let size = self.sort_spill_reservation_bytes;
if self.merge_reservation.size() != size {
self.merge_reservation
.try_resize(size)
.map_err(Self::err_with_oom_context)?;
}
}
Ok(())
}
async fn reserve_memory_for_batch_and_maybe_spill(
&mut self,
input: &RecordBatch,
) -> Result<()> {
let size = get_reserved_bytes_for_record_batch(input)?;
match self.reservation.try_grow(size) {
Ok(_) => Ok(()),
Err(e) => {
if self.in_mem_batches.is_empty() {
return Err(Self::err_with_oom_context(e));
}
self.sort_and_spill_in_mem_batches().await?;
self.reservation
.try_grow(size)
.map_err(Self::err_with_oom_context)
}
}
}
fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
match e {
DataFusionError::ResourcesExhausted(_) => e.context(
"Not enough memory to continue external sort. \
Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', \
or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'."
),
_ => e,
}
}
}
pub(crate) fn get_reserved_bytes_for_record_batch_size(
record_batch_size: usize,
sliced_size: usize,
) -> usize {
record_batch_size + sliced_size
}
pub(crate) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result<usize> {
batch.get_sliced_size().map(|sliced_size| {
get_reserved_bytes_for_record_batch_size(
get_record_batch_memory_size(batch),
sliced_size,
)
})
}
impl Debug for ExternalSorter {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ExternalSorter")
.field("memory_used", &self.used())
.field("spilled_bytes", &self.spilled_bytes())
.field("spilled_rows", &self.spilled_rows())
.field("spill_count", &self.spill_count())
.finish()
}
}
pub fn sort_batch(
batch: &RecordBatch,
expressions: &LexOrdering,
fetch: Option<usize>,
) -> Result<RecordBatch> {
let sort_columns = expressions
.iter()
.map(|expr| expr.evaluate_to_sort_column(batch))
.collect::<Result<Vec<_>>>()?;
let indices = lexsort_to_indices(&sort_columns, fetch)?;
let columns = take_arrays(batch.columns(), &indices, None)?;
let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
Ok(RecordBatch::try_new_with_options(
batch.schema(),
columns,
&options,
)?)
}
pub fn sort_batch_chunked(
batch: &RecordBatch,
expressions: &LexOrdering,
batch_size: usize,
) -> Result<Vec<RecordBatch>> {
IncrementalSortIterator::new(batch.clone(), expressions.clone(), batch_size).collect()
}
#[derive(Debug, Clone)]
pub struct SortExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
expr: LexOrdering,
metrics_set: ExecutionPlanMetricsSet,
preserve_partitioning: bool,
fetch: Option<usize>,
common_sort_prefix: Vec<PhysicalSortExpr>,
cache: Arc<PlanProperties>,
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
}
impl SortExec {
pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
let preserve_partitioning = false;
let (cache, sort_prefix) =
Self::compute_properties(&input, expr.clone(), preserve_partitioning)
.unwrap();
Self {
expr,
input,
metrics_set: ExecutionPlanMetricsSet::new(),
preserve_partitioning,
fetch: None,
common_sort_prefix: sort_prefix,
cache: Arc::new(cache),
filter: None,
}
}
pub fn preserve_partitioning(&self) -> bool {
self.preserve_partitioning
}
pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
self.preserve_partitioning = preserve_partitioning;
Arc::make_mut(&mut self.cache).partitioning =
Self::output_partitioning_helper(&self.input, self.preserve_partitioning);
self
}
fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
let children = self
.expr
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(children, lit(true)),
))))
}
fn cloned(&self) -> Self {
SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
common_sort_prefix: self.common_sort_prefix.clone(),
fetch: self.fetch,
cache: Arc::clone(&self.cache),
filter: self.filter.clone(),
}
}
pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
let mut cache = PlanProperties::clone(&self.cache);
let is_pipeline_friendly = matches!(
cache.emission_type,
EmissionType::Incremental | EmissionType::Both
);
if fetch.is_some() && is_pipeline_friendly {
cache = cache.with_boundedness(Boundedness::Bounded);
}
let filter = fetch.is_some().then(|| {
self.filter.clone().unwrap_or_else(|| self.create_filter())
});
let mut new_sort = self.cloned();
new_sort.fetch = fetch;
new_sort.cache = cache.into();
new_sort.filter = filter;
new_sort
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
pub fn expr(&self) -> &LexOrdering {
&self.expr
}
pub fn fetch(&self) -> Option<usize> {
self.fetch
}
fn output_partitioning_helper(
input: &Arc<dyn ExecutionPlan>,
preserve_partitioning: bool,
) -> Partitioning {
if preserve_partitioning {
input.output_partitioning().clone()
} else {
Partitioning::UnknownPartitioning(1)
}
}
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
sort_exprs: LexOrdering,
preserve_partitioning: bool,
) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
let (sort_prefix, sort_satisfied) = input
.equivalence_properties()
.extract_common_sort_prefix(sort_exprs.clone())?;
let emission_type = if sort_satisfied {
input.pipeline_behavior()
} else {
EmissionType::Final
};
let boundedness = if sort_satisfied {
input.boundedness()
} else {
match input.boundedness() {
Boundedness::Unbounded { .. } => Boundedness::Unbounded {
requires_infinite_memory: true,
},
bounded => bounded,
}
};
let mut eq_properties = input.equivalence_properties().clone();
eq_properties.reorder(sort_exprs)?;
let output_partitioning =
Self::output_partitioning_helper(input, preserve_partitioning);
Ok((
PlanProperties::new(
eq_properties,
output_partitioning,
emission_type,
boundedness,
),
sort_prefix,
))
}
}
impl DisplayAs for SortExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let preserve_partitioning = self.preserve_partitioning;
match self.fetch {
Some(fetch) => {
write!(
f,
"SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
self.expr
)?;
if let Some(filter) = &self.filter
&& let Ok(current) = filter.read().expr().current()
&& !current.eq(&lit(true))
{
write!(f, ", filter=[{current}]")?;
}
if !self.common_sort_prefix.is_empty() {
write!(f, ", sort_prefix=[")?;
let mut first = true;
for sort_expr in &self.common_sort_prefix {
if first {
first = false;
} else {
write!(f, ", ")?;
}
write!(f, "{sort_expr}")?;
}
write!(f, "]")
} else {
Ok(())
}
}
None => write!(
f,
"SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
self.expr
),
}
}
DisplayFormatType::TreeRender => match self.fetch {
Some(fetch) => {
writeln!(f, "{}", self.expr)?;
writeln!(f, "limit={fetch}")
}
None => {
writeln!(f, "{}", self.expr)
}
},
}
}
}
impl ExecutionPlan for SortExec {
fn name(&self) -> &'static str {
match self.fetch {
Some(_) => "SortExec(TopK)",
None => "SortExec",
}
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn required_input_distribution(&self) -> Vec<Distribution> {
if self.preserve_partitioning {
vec![Distribution::UnspecifiedDistribution]
} else {
vec![Distribution::SinglePartition]
}
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_sort = self.cloned();
assert_eq!(children.len(), 1, "SortExec should have exactly one child");
new_sort.input = Arc::clone(&children[0]);
if !has_same_children_properties(self.as_ref(), &children)? {
let (cache, sort_prefix) = Self::compute_properties(
&new_sort.input,
new_sort.expr.clone(),
new_sort.preserve_partitioning,
)?;
new_sort.cache = Arc::new(cache);
new_sort.common_sort_prefix = sort_prefix;
}
Ok(Arc::new(new_sort))
}
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let children = self.children().into_iter().cloned().collect();
let new_sort = self.with_new_children(children)?;
let mut new_sort = new_sort
.as_any()
.downcast_ref::<SortExec>()
.expect("cloned 1 lines above this line, we know the type")
.clone();
new_sort.filter = Some(new_sort.create_filter());
new_sort.metrics_set = ExecutionPlanMetricsSet::new();
Ok(Arc::new(new_sort))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!(
"Start SortExec::execute for partition {} of context session_id {} and task_id {:?}",
partition,
context.session_id(),
context.task_id()
);
let mut input = self.input.execute(partition, Arc::clone(&context))?;
let execution_options = &context.session_config().options().execution;
trace!("End SortExec's input.execute for partition: {partition}");
let sort_satisfied = self
.input
.equivalence_properties()
.ordering_satisfy(self.expr.clone())?;
match (sort_satisfied, self.fetch.as_ref()) {
(true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
input,
0,
Some(*fetch),
BaselineMetrics::new(&self.metrics_set, partition),
))),
(true, None) => Ok(input),
(false, Some(fetch)) => {
let filter = self.filter.clone();
let mut topk = TopK::try_new(
partition,
input.schema(),
self.common_sort_prefix.clone(),
self.expr.clone(),
*fetch,
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
Arc::clone(&unwrap_or_internal_err!(filter)),
)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
topk.insert_batch(batch)?;
if topk.finished {
break;
}
}
topk.emit()
})
.try_flatten(),
)))
}
(false, None) => {
let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
context.session_config().spill_compression(),
&self.metrics_set,
context.runtime_env(),
)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort().await
})
.try_flatten(),
)))
}
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics_set.clone_inner())
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if !self.preserve_partitioning() {
return self
.input
.partition_statistics(None)?
.with_fetch(self.fetch, 0, 1);
}
self.input
.partition_statistics(partition)?
.with_fetch(self.fetch, 0, 1)
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(SortExec::with_fetch(self, limit)))
}
fn fetch(&self) -> Option<usize> {
self.fetch
}
fn cardinality_effect(&self) -> CardinalityEffect {
if self.fetch.is_none() {
CardinalityEffect::Equal
} else {
CardinalityEffect::LowerEqual
}
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
else {
return Ok(None);
};
Ok(Some(Arc::new(
SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
.with_fetch(self.fetch())
.with_preserve_partitioning(self.preserve_partitioning()),
)))
}
fn gather_filters_for_pushdown(
&self,
phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
config: &datafusion_common::config::ConfigOptions,
) -> Result<FilterDescription> {
if phase != FilterPushdownPhase::Post {
if self.fetch.is_some() {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
}
return FilterDescription::from_children(parent_filters, &self.children());
}
let mut child = if self.fetch.is_some() {
ChildFilterDescription::all_unsupported(&parent_filters)
} else {
ChildFilterDescription::from_child(&parent_filters, self.input())?
};
if let Some(filter) = &self.filter
&& config.optimizer.enable_topk_dynamic_filter_pushdown
{
child = child.with_self_filter(filter.read().expr());
}
Ok(FilterDescription::new().with_child(child))
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::collect;
use crate::empty::EmptyExec;
use crate::execution_plan::Boundedness;
use crate::expressions::col;
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
use crate::test;
use crate::test::TestMemoryExec;
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
use crate::test::{assert_is_pending, make_partition};
use arrow::array::*;
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_execution::RecordBatchStream;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::expressions::{Column, Literal};
use futures::{FutureExt, Stream};
use insta::assert_snapshot;
#[derive(Debug, Clone)]
pub struct SortedUnboundedExec {
schema: Schema,
batch_size: u64,
cache: Arc<PlanProperties>,
}
impl DisplayAs for SortedUnboundedExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default
| DisplayFormatType::Verbose
| DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
}
Ok(())
}
}
impl SortedUnboundedExec {
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let mut eq_properties = EquivalenceProperties::new(schema);
eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
Column::new("c1", 0),
))]);
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Unbounded {
requires_infinite_memory: false,
},
)
}
}
impl ExecutionPlan for SortedUnboundedExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(SortedUnboundedStream {
schema: Arc::new(self.schema.clone()),
batch_size: self.batch_size,
offset: 0,
}))
}
}
#[derive(Debug)]
pub struct SortedUnboundedStream {
schema: SchemaRef,
batch_size: u64,
offset: u64,
}
impl Stream for SortedUnboundedStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let batch = SortedUnboundedStream::create_record_batch(
Arc::clone(&self.schema),
self.offset,
self.batch_size,
);
self.offset += self.batch_size;
Poll::Ready(Some(Ok(batch)))
}
}
impl RecordBatchStream for SortedUnboundedStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
impl SortedUnboundedStream {
fn create_record_batch(
schema: SchemaRef,
offset: u64,
batch_size: u64,
) -> RecordBatch {
let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
let array = UInt64Array::from(values);
let array_ref: ArrayRef = Arc::new(array);
RecordBatch::try_new(schema, vec![array_ref]).unwrap()
}
}
#[tokio::test]
async fn test_in_mem_sort() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let partitions = 4;
let csv = test::scan_partitioned(partitions);
let schema = csv.schema();
let sort_exec = Arc::new(SortExec::new(
[PhysicalSortExpr {
expr: col("i", &schema)?,
options: SortOptions::default(),
}]
.into(),
Arc::new(CoalescePartitionsExec::new(csv)),
));
let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
assert_eq!(result.len(), 1);
assert_eq!(result[0].num_rows(), 400);
assert_eq!(
task_ctx.runtime_env().memory_pool.reserved(),
0,
"The sort should have returned all memory used back to the memory manager"
);
Ok(())
}
#[tokio::test]
async fn test_sort_spill() -> Result<()> {
let session_config = SessionConfig::new();
let sort_spill_reservation_bytes = session_config
.options()
.execution
.sort_spill_reservation_bytes;
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
.build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime),
);
let partitions = 100;
let input = test::scan_partitioned(partitions);
let schema = input.schema();
let sort_exec = Arc::new(SortExec::new(
[PhysicalSortExpr {
expr: col("i", &schema)?,
options: SortOptions::default(),
}]
.into(),
Arc::new(CoalescePartitionsExec::new(input)),
));
let result = collect(
Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
Arc::clone(&task_ctx),
)
.await?;
assert_eq!(result.len(), 2);
let metrics = sort_exec.metrics().unwrap();
assert_eq!(metrics.output_rows().unwrap(), 10000);
assert!(metrics.elapsed_compute().unwrap() > 0);
let spill_count = metrics.spill_count().unwrap();
let spilled_rows = metrics.spilled_rows().unwrap();
let spilled_bytes = metrics.spilled_bytes().unwrap();
assert!((3..=10).contains(&spill_count));
assert!((9000..=10000).contains(&spilled_rows));
assert!((38000..=44000).contains(&spilled_bytes));
let columns = result[0].columns();
let i = as_primitive_array::<Int32Type>(&columns[0])?;
assert_eq!(i.value(0), 0);
assert_eq!(i.value(i.len() - 1), 81);
assert_eq!(
task_ctx.runtime_env().memory_pool.reserved(),
0,
"The sort should have returned all memory used back to the memory manager"
);
Ok(())
}
#[tokio::test]
async fn test_batch_reservation_error() -> Result<()> {
let merge_reservation: usize = 0;
let session_config =
SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
let plan = test::scan_partitioned(1);
let expected_batch_reservation = {
let temp_ctx = Arc::new(TaskContext::default());
let mut stream = plan.execute(0, Arc::clone(&temp_ctx))?;
let first_batch = stream.next().await.unwrap()?;
get_reserved_bytes_for_record_batch(&first_batch)?
};
let memory_limit: usize = expected_batch_reservation + merge_reservation - 1;
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(memory_limit, 1.0)
.build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime),
);
{
let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
let first_batch = stream.next().await.unwrap()?;
let batch_reservation = get_reserved_bytes_for_record_batch(&first_batch)?;
assert_eq!(batch_reservation, expected_batch_reservation);
assert!(memory_limit < (merge_reservation + batch_reservation));
}
let sort_exec = Arc::new(SortExec::new(
[PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
plan,
));
let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
let err = result.unwrap_err();
assert!(
matches!(err, DataFusionError::Context(..)),
"Assertion failed: expected a Context error, but got: {err:?}"
);
assert!(
matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
"Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
);
let config_vector = vec![
"datafusion.runtime.memory_limit",
"datafusion.execution.sort_spill_reservation_bytes",
];
let error_message = err.message().to_string();
for config in config_vector.into_iter() {
assert!(
error_message.as_str().contains(config),
"Config: '{}' should be contained in error message: {}.",
config,
error_message.as_str()
);
}
Ok(())
}
#[tokio::test]
async fn test_sort_spill_utf8_strings() -> Result<()> {
let session_config = SessionConfig::new()
.with_batch_size(100)
.with_sort_in_place_threshold_bytes(20 * 1024)
.with_sort_spill_reservation_bytes(100 * 1024);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(500 * 1024, 1.0)
.build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime),
);
let input = test::scan_partitioned_utf8(200);
let schema = input.schema();
let sort_exec = Arc::new(SortExec::new(
[PhysicalSortExpr {
expr: col("i", &schema)?,
options: SortOptions::default(),
}]
.into(),
Arc::new(CoalescePartitionsExec::new(input)),
));
let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
assert_eq!(num_rows, 20000);
let metrics = sort_exec.metrics().unwrap();
assert_eq!(metrics.output_rows().unwrap(), 20000);
assert!(metrics.elapsed_compute().unwrap() > 0);
let spill_count = metrics.spill_count().unwrap();
let spilled_rows = metrics.spilled_rows().unwrap();
let spilled_bytes = metrics.spilled_bytes().unwrap();
assert!((4..=8).contains(&spill_count));
assert!((15000..=20000).contains(&spilled_rows));
assert!((900000..=1000000).contains(&spilled_bytes));
let concated_result = concat_batches(&schema, &result)?;
let columns = concated_result.columns();
let string_array = as_string_array(&columns[0]);
for i in 0..string_array.len() - 1 {
assert!(string_array.value(i) <= string_array.value(i + 1));
}
assert_eq!(
task_ctx.runtime_env().memory_pool.reserved(),
0,
"The sort should have returned all memory used back to the memory manager"
);
Ok(())
}
#[tokio::test]
async fn test_sort_fetch_memory_calculation() -> Result<()> {
let avg_batch_size = 400;
let partitions = 4;
let test_options = vec![
(None, true),
(Some(1), false),
];
for (fetch, expect_spillage) in test_options {
let session_config = SessionConfig::new();
let sort_spill_reservation_bytes = session_config
.options()
.execution
.sort_spill_reservation_bytes;
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(
sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1.0,
)
.build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_runtime(runtime)
.with_session_config(session_config),
);
let csv = test::scan_partitioned(partitions);
let schema = csv.schema();
let sort_exec = Arc::new(
SortExec::new(
[PhysicalSortExpr {
expr: col("i", &schema)?,
options: SortOptions::default(),
}]
.into(),
Arc::new(CoalescePartitionsExec::new(csv)),
)
.with_fetch(fetch),
);
let result =
collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
assert_eq!(result.len(), 1);
let metrics = sort_exec.metrics().unwrap();
let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
}
Ok(())
}
#[tokio::test]
async fn test_sort_memory_reduction_per_batch() -> Result<()> {
let batch_size = 50; let num_rows = 1000;
let task_ctx = Arc::new(
TaskContext::default().with_session_config(
SessionConfig::new()
.with_batch_size(batch_size)
.with_sort_in_place_threshold_bytes(usize::MAX), ),
);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let mut values: Vec<i32> = (0..num_rows).collect();
values.reverse();
let input_batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(values))],
)?;
let batches = vec![input_batch];
let sort_exec = Arc::new(SortExec::new(
[PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: SortOptions::default(),
}]
.into(),
TestMemoryExec::try_new_exec(
std::slice::from_ref(&batches),
Arc::clone(&schema),
None,
)?,
));
let mut stream = sort_exec.execute(0, Arc::clone(&task_ctx))?;
let mut previous_reserved = task_ctx.runtime_env().memory_pool.reserved();
let mut batch_count = 0;
while let Some(result) = stream.next().await {
let batch = result?;
batch_count += 1;
assert!(batch.num_rows() > 0, "Batch should not be empty");
let current_reserved = task_ctx.runtime_env().memory_pool.reserved();
if batch_count > 1 {
assert!(
current_reserved <= previous_reserved,
"Memory reservation should decrease or stay same as batches are emitted. \
Batch {batch_count}: previous={previous_reserved}, current={current_reserved}"
);
}
previous_reserved = current_reserved;
}
assert!(
batch_count > 1,
"Expected multiple batches to be emitted, got {batch_count}"
);
assert_eq!(
task_ctx.runtime_env().memory_pool.reserved(),
0,
"All memory should be returned after consuming all batches"
);
Ok(())
}
#[tokio::test]
async fn test_sort_metadata() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let field_metadata: HashMap<String, String> =
vec![("foo".to_string(), "bar".to_string())]
.into_iter()
.collect();
let schema_metadata: HashMap<String, String> =
vec![("baz".to_string(), "barf".to_string())]
.into_iter()
.collect();
let mut field = Field::new("field_name", DataType::UInt64, true);
field.set_metadata(field_metadata.clone());
let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
let schema = Arc::new(schema);
let data: ArrayRef =
Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
let input =
TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
let sort_exec = Arc::new(SortExec::new(
[PhysicalSortExpr {
expr: col("field_name", &schema)?,
options: SortOptions::default(),
}]
.into(),
input,
));
let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
let expected_data: ArrayRef =
Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
let expected_batch =
RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
assert_eq!(&vec![expected_batch], &result);
assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
assert_eq!(result[0].schema().metadata(), &schema_metadata);
Ok(())
}
#[tokio::test]
async fn test_lex_sort_by_mixed_types() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new(
"b",
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
true,
),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(3)]),
Some(vec![Some(1)]),
Some(vec![Some(6), None]),
Some(vec![Some(5)]),
])),
],
)?;
let sort_exec = Arc::new(SortExec::new(
[
PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
},
PhysicalSortExpr {
expr: col("b", &schema)?,
options: SortOptions {
descending: true,
nulls_first: false,
},
},
]
.into(),
TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
));
assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
assert_eq!(
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
*sort_exec.schema().field(1).data_type()
);
let result: Vec<RecordBatch> =
collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
let metrics = sort_exec.metrics().unwrap();
assert!(metrics.elapsed_compute().unwrap() > 0);
assert_eq!(metrics.output_rows().unwrap(), 4);
assert_eq!(result.len(), 1);
let expected = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1)]),
Some(vec![Some(6), None]),
Some(vec![Some(5)]),
Some(vec![Some(3)]),
])),
],
)?;
assert_eq!(expected, result[0]);
Ok(())
}
#[tokio::test]
async fn test_lex_sort_by_float() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float32, true),
Field::new("b", DataType::Float64, true),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Float32Array::from(vec![
Some(f32::NAN),
None,
None,
Some(f32::NAN),
Some(1.0_f32),
Some(1.0_f32),
Some(2.0_f32),
Some(3.0_f32),
])),
Arc::new(Float64Array::from(vec![
Some(200.0_f64),
Some(20.0_f64),
Some(10.0_f64),
Some(100.0_f64),
Some(f64::NAN),
None,
None,
Some(f64::NAN),
])),
],
)?;
let sort_exec = Arc::new(SortExec::new(
[
PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions {
descending: true,
nulls_first: true,
},
},
PhysicalSortExpr {
expr: col("b", &schema)?,
options: SortOptions {
descending: false,
nulls_first: false,
},
},
]
.into(),
TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
));
assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
let result: Vec<RecordBatch> =
collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
let metrics = sort_exec.metrics().unwrap();
assert!(metrics.elapsed_compute().unwrap() > 0);
assert_eq!(metrics.output_rows().unwrap(), 8);
assert_eq!(result.len(), 1);
let columns = result[0].columns();
assert_eq!(DataType::Float32, *columns[0].data_type());
assert_eq!(DataType::Float64, *columns[1].data_type());
let a = as_primitive_array::<Float32Type>(&columns[0])?;
let b = as_primitive_array::<Float64Type>(&columns[1])?;
let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
.map(|i| {
let aval = if a.is_valid(i) {
Some(a.value(i).to_string())
} else {
None
};
let bval = if b.is_valid(i) {
Some(b.value(i).to_string())
} else {
None
};
(aval, bval)
})
.collect();
let expected: Vec<(Option<String>, Option<String>)> = vec![
(None, Some("10".to_owned())),
(None, Some("20".to_owned())),
(Some("NaN".to_owned()), Some("100".to_owned())),
(Some("NaN".to_owned()), Some("200".to_owned())),
(Some("3".to_owned()), Some("NaN".to_owned())),
(Some("2".to_owned()), None),
(Some("1".to_owned()), Some("NaN".to_owned())),
(Some("1".to_owned()), None),
];
assert_eq!(expected, result);
Ok(())
}
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
let refs = blocking_exec.refs();
let sort_exec = Arc::new(SortExec::new(
[PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions::default(),
}]
.into(),
blocking_exec,
));
let fut = collect(sort_exec, Arc::clone(&task_ctx));
let mut fut = fut.boxed();
assert_is_pending(&mut fut);
drop(fut);
assert_strong_count_converges_to_zero(refs).await;
assert_eq!(
task_ctx.runtime_env().memory_pool.reserved(),
0,
"The sort should have returned all memory used back to the memory manager"
);
Ok(())
}
#[test]
fn test_empty_sort_batch() {
let schema = Arc::new(Schema::empty());
let options = RecordBatchOptions::new().with_row_count(Some(1));
let batch =
RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
.unwrap();
let expressions = [PhysicalSortExpr {
expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
options: SortOptions::default(),
}]
.into();
let result = sort_batch(&batch, &expressions, None).unwrap();
assert_eq!(result.num_rows(), 1);
}
#[tokio::test]
async fn topk_unbounded_source() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
let source = SortedUnboundedExec {
schema: schema.clone(),
batch_size: 2,
cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new(
schema.clone(),
))),
};
let mut plan = SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new(
"c1", 0,
)))]
.into(),
Arc::new(source),
);
plan = plan.with_fetch(Some(9));
let batches = collect(Arc::new(plan), task_ctx).await?;
assert_snapshot!(batches_to_string(&batches), @r"
+----+
| c1 |
+----+
| 0 |
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
+----+
");
Ok(())
}
#[tokio::test]
async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
let batch_size = 100;
let create_task_ctx = |_: &[RecordBatch]| {
TaskContext::default().with_session_config(
SessionConfig::new()
.with_batch_size(batch_size)
.with_sort_in_place_threshold_bytes(usize::MAX),
)
};
test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
Ok(())
}
#[tokio::test]
async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place()
-> Result<()> {
let batch_size = 100;
let create_task_ctx = |_: &[RecordBatch]| {
TaskContext::default().with_session_config(
SessionConfig::new()
.with_batch_size(batch_size)
.with_sort_in_place_threshold_bytes(usize::MAX - 1),
)
};
{
let metrics =
test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
assert_eq!(
metrics.spill_count(),
Some(0),
"Expected no spills when sorting in place"
);
}
{
let metrics =
test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
assert_eq!(
metrics.spill_count(),
Some(0),
"Expected no spills when sorting in place"
);
}
{
let metrics =
test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
assert_eq!(
metrics.spill_count(),
Some(0),
"Expected no spills when sorting in place"
);
}
Ok(())
}
#[tokio::test]
async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch()
-> Result<()> {
let batch_size = 100;
let create_task_ctx = |_: &[RecordBatch]| {
TaskContext::default()
.with_session_config(SessionConfig::new().with_batch_size(batch_size))
};
{
let metrics = test_sort_output_batch_size(
1,
batch_size / 4,
create_task_ctx,
)
.await?;
assert_eq!(
metrics.spill_count(),
Some(0),
"Expected no spills when sorting in place"
);
}
{
let metrics = test_sort_output_batch_size(
1,
batch_size + 7,
create_task_ctx,
)
.await?;
assert_eq!(
metrics.spill_count(),
Some(0),
"Expected no spills when sorting in place"
);
}
{
let metrics = test_sort_output_batch_size(
1,
batch_size * 3,
create_task_ctx,
)
.await?;
assert_eq!(
metrics.spill_count(),
Some(0),
"Expected no spills when sorting in place"
);
}
Ok(())
}
#[tokio::test]
async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill()
-> Result<()> {
let batch_size = 100;
let create_task_ctx = |generated_batches: &[RecordBatch]| {
let batches_memory = generated_batches
.iter()
.map(|b| b.get_array_memory_size())
.sum::<usize>();
TaskContext::default()
.with_session_config(
SessionConfig::new()
.with_batch_size(batch_size)
.with_sort_in_place_threshold_bytes(1)
.with_sort_spill_reservation_bytes(1),
)
.with_runtime(
RuntimeEnvBuilder::default()
.with_memory_limit(batches_memory, 1.0)
.build_arc()
.unwrap(),
)
};
{
let metrics =
test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
}
{
let metrics =
test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
}
{
let metrics =
test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
}
Ok(())
}
async fn test_sort_output_batch_size(
number_of_batches: usize,
batch_size_to_generate: usize,
create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
) -> Result<MetricsSet> {
let batches = (0..number_of_batches)
.map(|_| make_partition(batch_size_to_generate as i32))
.collect::<Vec<_>>();
let task_ctx = create_task_ctx(batches.as_slice());
let expected_batch_size = task_ctx.session_config().batch_size();
let (mut output_batches, metrics) =
run_sort_on_input(task_ctx, "i", batches).await?;
let last_batch = output_batches.pop().unwrap();
for batch in output_batches {
assert_eq!(batch.num_rows(), expected_batch_size);
}
let mut last_expected_batch_size =
(batch_size_to_generate * number_of_batches) % expected_batch_size;
if last_expected_batch_size == 0 {
last_expected_batch_size = expected_batch_size;
}
assert_eq!(last_batch.num_rows(), last_expected_batch_size);
Ok(metrics)
}
async fn run_sort_on_input(
task_ctx: TaskContext,
order_by_col: &str,
batches: Vec<RecordBatch>,
) -> Result<(Vec<RecordBatch>, MetricsSet)> {
let task_ctx = Arc::new(task_ctx);
let schema = batches[0].schema();
let ordering: LexOrdering = [PhysicalSortExpr {
expr: col(order_by_col, &schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
}]
.into();
let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
ordering.clone(),
TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
));
let sorted_batches =
collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
let metrics = sort_exec.metrics().expect("sort have metrics");
{
let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
let sorted_batches_concat =
concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
assert_eq!(sorted_input_batch, sorted_batches_concat);
}
Ok((sorted_batches, metrics))
}
#[tokio::test]
async fn test_sort_batch_chunked_basic() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let mut values: Vec<i32> = (0..1000).collect();
values.reverse();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(values))],
)?;
let expressions: LexOrdering =
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
let result_batches = sort_batch_chunked(&batch, &expressions, 250)?;
assert_eq!(result_batches.len(), 4);
let mut total_rows = 0;
for (i, batch) in result_batches.iter().enumerate() {
assert!(
batch.num_rows() <= 250,
"Batch {} has {} rows, expected <= 250",
i,
batch.num_rows()
);
total_rows += batch.num_rows();
}
assert_eq!(total_rows, 1000);
let concatenated = concat_batches(&schema, &result_batches)?;
let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
for i in 0..array.len() - 1 {
assert!(
array.value(i) <= array.value(i + 1),
"Array not sorted at position {}: {} > {}",
i,
array.value(i),
array.value(i + 1)
);
}
assert_eq!(array.value(0), 0);
assert_eq!(array.value(array.len() - 1), 999);
Ok(())
}
#[tokio::test]
async fn test_sort_batch_chunked_smaller_than_batch_size() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let values: Vec<i32> = (0..50).rev().collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(values))],
)?;
let expressions: LexOrdering =
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
assert_eq!(result_batches.len(), 1);
assert_eq!(result_batches[0].num_rows(), 50);
let array = as_primitive_array::<Int32Type>(result_batches[0].column(0))?;
for i in 0..array.len() - 1 {
assert!(array.value(i) <= array.value(i + 1));
}
assert_eq!(array.value(0), 0);
assert_eq!(array.value(49), 49);
Ok(())
}
#[tokio::test]
async fn test_sort_batch_chunked_exact_multiple() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let values: Vec<i32> = (0..1000).rev().collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(values))],
)?;
let expressions: LexOrdering =
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
assert_eq!(result_batches.len(), 10);
for batch in &result_batches {
assert_eq!(batch.num_rows(), 100);
}
let concatenated = concat_batches(&schema, &result_batches)?;
let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
for i in 0..array.len() - 1 {
assert!(array.value(i) <= array.value(i + 1));
}
Ok(())
}
#[tokio::test]
async fn test_sort_batch_chunked_empty_batch() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let batch = RecordBatch::new_empty(Arc::clone(&schema));
let expressions: LexOrdering =
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
assert_eq!(result_batches.len(), 0);
Ok(())
}
#[tokio::test]
async fn test_get_reserved_bytes_for_record_batch_with_sliced_batches() -> Result<()>
{
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
let sliced_array = large_array.slice(100, 50);
let sliced_batch =
RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(sliced_array)])?;
let batch =
RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(large_array)])?;
let sliced_reserved = get_reserved_bytes_for_record_batch(&sliced_batch)?;
let reserved = get_reserved_bytes_for_record_batch(&batch)?;
assert!(reserved > sliced_reserved);
Ok(())
}
fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let input = Arc::new(EmptyExec::new(schema));
SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
input,
)
.with_fetch(fetch)
}
#[test]
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
let sort = make_sort_exec_with_fetch(Some(10));
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![Arc::new(Column::new("a", 0))],
&ConfigOptions::new(),
)?;
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::No
));
Ok(())
}
#[test]
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
let sort = make_sort_exec_with_fetch(None);
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![Arc::new(Column::new("a", 0))],
&ConfigOptions::new(),
)?;
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::Yes
));
Ok(())
}
#[test]
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
let sort = make_sort_exec_with_fetch(Some(10));
assert!(sort.filter.is_some(), "TopK filter should be created");
let mut config = ConfigOptions::new();
config.optimizer.enable_topk_dynamic_filter_pushdown = true;
let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![Arc::new(Column::new("a", 0))],
&config,
)?;
assert!(matches!(
desc.parent_filters()[0][0].discriminant,
PushedDown::No
));
assert_eq!(desc.self_filters()[0].len(), 1);
Ok(())
}
}