datafusion_physical_plan/sorts/
sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort that deals with an arbitrary size of the input.
19//! It will do in-memory sorting if it has enough memory budget
20//! but spills to disk if needed.
21
22use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
31use crate::expressions::PhysicalSortExpr;
32use crate::filter_pushdown::{
33    ChildFilterDescription, FilterDescription, FilterPushdownPhase,
34};
35use crate::limit::LimitStream;
36use crate::metrics::{
37    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, SplitMetrics,
38};
39use crate::projection::{make_with_child, update_ordering, ProjectionExec};
40use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
41use crate::spill::get_record_batch_memory_size;
42use crate::spill::in_progress_spill_file::InProgressSpillFile;
43use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
44use crate::stream::BatchSplitStream;
45use crate::stream::RecordBatchStreamAdapter;
46use crate::topk::TopK;
47use crate::topk::TopKDynamicFilters;
48use crate::{
49    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
50    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
51    Statistics,
52};
53
54use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
55use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
56use arrow::datatypes::SchemaRef;
57use datafusion_common::config::SpillCompression;
58use datafusion_common::{
59    internal_datafusion_err, internal_err, unwrap_or_internal_err, DataFusionError,
60    Result,
61};
62use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
63use datafusion_execution::runtime_env::RuntimeEnv;
64use datafusion_execution::TaskContext;
65use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
66use datafusion_physical_expr::LexOrdering;
67use datafusion_physical_expr::PhysicalExpr;
68
69use futures::{StreamExt, TryStreamExt};
70use log::{debug, trace};
71
72struct ExternalSorterMetrics {
73    /// metrics
74    baseline: BaselineMetrics,
75
76    spill_metrics: SpillMetrics,
77
78    split_metrics: SplitMetrics,
79}
80
81impl ExternalSorterMetrics {
82    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
83        Self {
84            baseline: BaselineMetrics::new(metrics, partition),
85            spill_metrics: SpillMetrics::new(metrics, partition),
86            split_metrics: SplitMetrics::new(metrics, partition),
87        }
88    }
89}
90
91/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
92/// a total order. Depending on the input size and memory manager
93/// configuration, writes intermediate results to disk ("spills")
94/// using Arrow IPC format.
95///
96/// # Algorithm
97///
98/// 1. get a non-empty new batch from input
99///
100/// 2. check with the memory manager there is sufficient space to
101///    buffer the batch in memory.
102///
103/// 2.1 if memory is sufficient, buffer batch in memory, go to 1.
104///
105/// 2.2 if no more memory is available, sort all buffered batches and
106///     spill to file.  buffer the next batch in memory, go to 1.
107///
108/// 3. when input is exhausted, merge all in memory batches and spills
109///    to get a total order.
110///
111/// # When data fits in available memory
112///
113/// If there is sufficient memory, data is sorted in memory to produce the output
114///
115/// ```text
116///    ┌─────┐
117///    │  2  │
118///    │  3  │
119///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
120///    │  4  │
121///    │  2  │                  │
122///    └─────┘                  ▼
123///    ┌─────┐
124///    │  1  │              In memory
125///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
126///    │  1  │
127///    └─────┘                  ▲
128///      ...                    │
129///
130///    ┌─────┐                  │
131///    │  4  │
132///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
133///    └─────┘
134///
135/// in_mem_batches
136/// ```
137///
138/// # When data does not fit in available memory
139///
140///  When memory is exhausted, data is first sorted and written to one
141///  or more spill files on disk:
142///
143/// ```text
144///    ┌─────┐                               .─────────────────.
145///    │  2  │                              (                   )
146///    │  3  │                              │`─────────────────'│
147///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
148///    │  4  │             │                │  │ 1  │░          │
149///    │  2  │                              │  │... │░          │
150///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
151///    ┌─────┐                              │  └────┘░    1  │░ │
152///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
153///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
154///    │  1  │     and write to file        │           │    ░░ │
155///    └─────┘                              │             4  │░ │
156///      ...               ▲                │           └░─░─░░ │
157///                        │                │            ░░░░░░ │
158///    ┌─────┐                              │.─────────────────.│
159///    │  4  │             │                (                   )
160///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
161///    └─────┘
162///
163/// in_mem_batches                                  spills
164///                                         (file on disk in Arrow
165///                                               IPC format)
166/// ```
167///
168/// Once the input is completely read, the spill files are read and
169/// merged with any in memory batches to produce a single total sorted
170/// output:
171///
172/// ```text
173///   .─────────────────.
174///  (                   )
175///  │`─────────────────'│
176///  │  ┌────┐           │
177///  │  │ 1  │░          │
178///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
179///  │  │ 4  │░ ┌────┐   │           │
180///  │  └────┘░ │ 1  │░  │           ▼
181///  │   ░░░░░░ │    │░  │
182///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
183///  │          │    │░  │
184///  │          │ 4  │░  │           ▲
185///  │          └────┘░  │           │
186///  │           ░░░░░░  │
187///  │.─────────────────.│           │
188///  (                   )
189///   `─────────────────'            │
190///         spills
191///                                  │
192///
193///                                  │
194///
195///     ┌─────┐                      │
196///     │  1  │
197///     │  4  │─ ─ ─ ─               │
198///     └─────┘       │
199///       ...                   In memory
200///                   └ ─ ─ ─▶  sort/merge
201///     ┌─────┐
202///     │  4  │                      ▲
203///     │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
204///     └─────┘
205///
206///  in_mem_batches
207/// ```
208struct ExternalSorter {
209    // ========================================================================
210    // PROPERTIES:
211    // Fields that define the sorter's configuration and remain constant
212    // ========================================================================
213    /// Schema of the output (and the input)
214    schema: SchemaRef,
215    /// Sort expressions
216    expr: LexOrdering,
217    /// The target number of rows for output batches
218    batch_size: usize,
219    /// If the in size of buffered memory batches is below this size,
220    /// the data will be concatenated and sorted in place rather than
221    /// sort/merged.
222    sort_in_place_threshold_bytes: usize,
223
224    // ========================================================================
225    // STATE BUFFERS:
226    // Fields that hold intermediate data during sorting
227    // ========================================================================
228    /// Unsorted input batches stored in the memory buffer
229    in_mem_batches: Vec<RecordBatch>,
230
231    /// During external sorting, in-memory intermediate data will be appended to
232    /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
233    ///
234    /// this is a tuple of:
235    /// 1. `InProgressSpillFile` - the file that is being written to
236    /// 2. `max_record_batch_memory` - the maximum memory usage of a single batch in this spill file.
237    in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
238    /// If data has previously been spilled, the locations of the spill files (in
239    /// Arrow IPC format)
240    /// Within the same spill file, the data might be chunked into multiple batches,
241    /// and ordered by sort keys.
242    finished_spill_files: Vec<SortedSpillFile>,
243
244    // ========================================================================
245    // EXECUTION RESOURCES:
246    // Fields related to managing execution resources and monitoring performance.
247    // ========================================================================
248    /// Runtime metrics
249    metrics: ExternalSorterMetrics,
250    /// A handle to the runtime to get spill files
251    runtime: Arc<RuntimeEnv>,
252    /// Reservation for in_mem_batches
253    reservation: MemoryReservation,
254    spill_manager: SpillManager,
255
256    /// Reservation for the merging of in-memory batches. If the sort
257    /// might spill, `sort_spill_reservation_bytes` will be
258    /// pre-reserved to ensure there is some space for this sort/merge.
259    merge_reservation: MemoryReservation,
260    /// How much memory to reserve for performing in-memory sort/merges
261    /// prior to spilling.
262    sort_spill_reservation_bytes: usize,
263}
264
265impl ExternalSorter {
266    // TODO: make a builder or some other nicer API to avoid the
267    // clippy warning
268    #[allow(clippy::too_many_arguments)]
269    pub fn new(
270        partition_id: usize,
271        schema: SchemaRef,
272        expr: LexOrdering,
273        batch_size: usize,
274        sort_spill_reservation_bytes: usize,
275        sort_in_place_threshold_bytes: usize,
276        // Configured via `datafusion.execution.spill_compression`.
277        spill_compression: SpillCompression,
278        metrics: &ExecutionPlanMetricsSet,
279        runtime: Arc<RuntimeEnv>,
280    ) -> Result<Self> {
281        let metrics = ExternalSorterMetrics::new(metrics, partition_id);
282        let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
283            .with_can_spill(true)
284            .register(&runtime.memory_pool);
285
286        let merge_reservation =
287            MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
288                .register(&runtime.memory_pool);
289
290        let spill_manager = SpillManager::new(
291            Arc::clone(&runtime),
292            metrics.spill_metrics.clone(),
293            Arc::clone(&schema),
294        )
295        .with_compression_type(spill_compression);
296
297        Ok(Self {
298            schema,
299            in_mem_batches: vec![],
300            in_progress_spill_file: None,
301            finished_spill_files: vec![],
302            expr,
303            metrics,
304            reservation,
305            spill_manager,
306            merge_reservation,
307            runtime,
308            batch_size,
309            sort_spill_reservation_bytes,
310            sort_in_place_threshold_bytes,
311        })
312    }
313
314    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
315    ///
316    /// Updates memory usage metrics, and possibly triggers spilling to disk
317    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
318        if input.num_rows() == 0 {
319            return Ok(());
320        }
321
322        self.reserve_memory_for_merge()?;
323        self.reserve_memory_for_batch_and_maybe_spill(&input)
324            .await?;
325
326        self.in_mem_batches.push(input);
327        Ok(())
328    }
329
330    fn spilled_before(&self) -> bool {
331        !self.finished_spill_files.is_empty()
332    }
333
334    /// Returns the final sorted output of all batches inserted via
335    /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
336    ///
337    /// This process could either be:
338    ///
339    /// 1. An in-memory sort/merge (if the input fit in memory)
340    ///
341    /// 2. A combined streaming merge incorporating both in-memory
342    ///    batches and data from spill files on disk.
343    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
344        // Release the memory reserved for merge back to the pool so
345        // there is some left when `in_mem_sort_stream` requests an
346        // allocation.
347        self.merge_reservation.free();
348
349        if self.spilled_before() {
350            // Sort `in_mem_batches` and spill it first. If there are many
351            // `in_mem_batches` and the memory limit is almost reached, merging
352            // them with the spilled files at the same time might cause OOM.
353            if !self.in_mem_batches.is_empty() {
354                self.sort_and_spill_in_mem_batches().await?;
355            }
356
357            StreamingMergeBuilder::new()
358                .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
359                .with_spill_manager(self.spill_manager.clone())
360                .with_schema(Arc::clone(&self.schema))
361                .with_expressions(&self.expr.clone())
362                .with_metrics(self.metrics.baseline.clone())
363                .with_batch_size(self.batch_size)
364                .with_fetch(None)
365                .with_reservation(self.merge_reservation.new_empty())
366                .build()
367        } else {
368            self.in_mem_sort_stream(self.metrics.baseline.clone())
369        }
370    }
371
372    /// How much memory is buffered in this `ExternalSorter`?
373    fn used(&self) -> usize {
374        self.reservation.size()
375    }
376
377    /// How many bytes have been spilled to disk?
378    fn spilled_bytes(&self) -> usize {
379        self.metrics.spill_metrics.spilled_bytes.value()
380    }
381
382    /// How many rows have been spilled to disk?
383    fn spilled_rows(&self) -> usize {
384        self.metrics.spill_metrics.spilled_rows.value()
385    }
386
387    /// How many spill files have been created?
388    fn spill_count(&self) -> usize {
389        self.metrics.spill_metrics.spill_file_count.value()
390    }
391
392    /// Appending globally sorted batches to the in-progress spill file, and clears
393    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
394    async fn consume_and_spill_append(
395        &mut self,
396        globally_sorted_batches: &mut Vec<RecordBatch>,
397    ) -> Result<()> {
398        if globally_sorted_batches.is_empty() {
399            return Ok(());
400        }
401
402        // Lazily initialize the in-progress spill file
403        if self.in_progress_spill_file.is_none() {
404            self.in_progress_spill_file =
405                Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
406        }
407
408        Self::organize_stringview_arrays(globally_sorted_batches)?;
409
410        debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
411
412        let batches_to_spill = std::mem::take(globally_sorted_batches);
413        self.reservation.free();
414
415        let (in_progress_file, max_record_batch_size) =
416            self.in_progress_spill_file.as_mut().ok_or_else(|| {
417                internal_datafusion_err!("In-progress spill file should be initialized")
418            })?;
419
420        for batch in batches_to_spill {
421            in_progress_file.append_batch(&batch)?;
422
423            *max_record_batch_size =
424                (*max_record_batch_size).max(batch.get_sliced_size()?);
425        }
426
427        if !globally_sorted_batches.is_empty() {
428            return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking.");
429        }
430
431        Ok(())
432    }
433
434    /// Finishes the in-progress spill file and moves it to the finished spill files.
435    async fn spill_finish(&mut self) -> Result<()> {
436        let (mut in_progress_file, max_record_batch_memory) =
437            self.in_progress_spill_file.take().ok_or_else(|| {
438                internal_datafusion_err!("Should be called after `spill_append`")
439            })?;
440        let spill_file = in_progress_file.finish()?;
441
442        if let Some(spill_file) = spill_file {
443            self.finished_spill_files.push(SortedSpillFile {
444                file: spill_file,
445                max_record_batch_memory,
446            });
447        }
448
449        Ok(())
450    }
451
452    /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
453    /// `StringViewArray` in sequential order by calling `gc()` on them.
454    ///
455    /// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
456    /// available
457    ///
458    /// # Rationale
459    /// After (merge-based) sorting, all batches will be sorted into a single run,
460    /// but physically this sorted run is chunked into many small batches. For
461    /// `StringViewArray`s inside each sorted run, their inner buffers are not
462    /// re-constructed by default, leading to non-sequential payload locations
463    /// (permutated by `interleave()` Arrow kernel). A single payload buffer might
464    /// be shared by multiple `RecordBatch`es.
465    /// When writing each batch to disk, the writer has to write all referenced buffers,
466    /// because they have to be read back one by one to reduce memory usage. This
467    /// causes extra disk reads and writes, and potentially execution failure.
468    ///
469    /// # Example
470    /// Before sorting:
471    /// batch1 -> buffer1
472    /// batch2 -> buffer2
473    ///
474    /// sorted_batch1 -> buffer1
475    ///               -> buffer2
476    /// sorted_batch2 -> buffer1
477    ///               -> buffer2
478    ///
479    /// Then when spilling each batch, the writer has to write all referenced buffers
480    /// repeatedly.
481    fn organize_stringview_arrays(
482        globally_sorted_batches: &mut Vec<RecordBatch>,
483    ) -> Result<()> {
484        let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
485
486        for batch in globally_sorted_batches.drain(..) {
487            let mut new_columns: Vec<Arc<dyn Array>> =
488                Vec::with_capacity(batch.num_columns());
489
490            let mut arr_mutated = false;
491            for array in batch.columns() {
492                if let Some(string_view_array) =
493                    array.as_any().downcast_ref::<StringViewArray>()
494                {
495                    let new_array = string_view_array.gc();
496                    new_columns.push(Arc::new(new_array));
497                    arr_mutated = true;
498                } else {
499                    new_columns.push(Arc::clone(array));
500                }
501            }
502
503            let organized_batch = if arr_mutated {
504                RecordBatch::try_new(batch.schema(), new_columns)?
505            } else {
506                batch
507            };
508
509            organized_batches.push(organized_batch);
510        }
511
512        *globally_sorted_batches = organized_batches;
513
514        Ok(())
515    }
516
517    /// Sorts the in-memory batches and merges them into a single sorted run, then writes
518    /// the result to spill files.
519    async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
520        if self.in_mem_batches.is_empty() {
521            return internal_err!(
522                "in_mem_batches must not be empty when attempting to sort and spill"
523            );
524        }
525
526        // Release the memory reserved for merge back to the pool so
527        // there is some left when `in_mem_sort_stream` requests an
528        // allocation. At the end of this function, memory will be
529        // reserved again for the next spill.
530        self.merge_reservation.free();
531
532        let mut sorted_stream =
533            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
534        // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
535        // to construct a globally sorted stream.
536        if !self.in_mem_batches.is_empty() {
537            return internal_err!(
538                "in_mem_batches should be empty after constructing sorted stream"
539            );
540        }
541        // 'global' here refers to all buffered batches when the memory limit is
542        // reached. This variable will buffer the sorted batches after
543        // sort-preserving merge and incrementally append to spill files.
544        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
545
546        while let Some(batch) = sorted_stream.next().await {
547            let batch = batch?;
548            let sorted_size = get_reserved_byte_for_record_batch(&batch);
549            if self.reservation.try_grow(sorted_size).is_err() {
550                // Although the reservation is not enough, the batch is
551                // already in memory, so it's okay to combine it with previously
552                // sorted batches, and spill together.
553                globally_sorted_batches.push(batch);
554                self.consume_and_spill_append(&mut globally_sorted_batches)
555                    .await?; // reservation is freed in spill()
556            } else {
557                globally_sorted_batches.push(batch);
558            }
559        }
560
561        // Drop early to free up memory reserved by the sorted stream, otherwise the
562        // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
563        drop(sorted_stream);
564
565        self.consume_and_spill_append(&mut globally_sorted_batches)
566            .await?;
567        self.spill_finish().await?;
568
569        // Sanity check after spilling
570        let buffers_cleared_property =
571            self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
572        if !buffers_cleared_property {
573            return internal_err!(
574                "in_mem_batches and globally_sorted_batches should be cleared before"
575            );
576        }
577
578        // Reserve headroom for next sort/merge
579        self.reserve_memory_for_merge()?;
580
581        Ok(())
582    }
583
584    /// Consumes in_mem_batches returning a sorted stream of
585    /// batches. This proceeds in one of two ways:
586    ///
587    /// # Small Datasets
588    ///
589    /// For "smaller" datasets, the data is first concatenated into a
590    /// single batch and then sorted. This is often faster than
591    /// sorting and then merging.
592    ///
593    /// ```text
594    ///        ┌─────┐
595    ///        │  2  │
596    ///        │  3  │
597    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
598    ///        │  4  │                     │  2  │
599    ///        │  2  │        │            │  3  │
600    ///        └─────┘                     │  1  │             sorted output
601    ///        ┌─────┐        ▼            │  4  │                stream
602    ///        │  1  │                     │  2  │
603    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
604    ///        │  1  │                     │  4  │
605    ///        └─────┘        ▲            │  1  │
606    ///          ...          │            │ ... │
607    ///                                    │  4  │
608    ///        ┌─────┐        │            │  3  │
609    ///        │  4  │                     └─────┘
610    ///        │  3  │─ ─ ─ ─ ┘
611    ///        └─────┘
612    ///     in_mem_batches
613    /// ```
614    ///
615    /// # Larger datasets
616    ///
617    /// For larger datasets, the batches are first sorted individually
618    /// and then merged together.
619    ///
620    /// ```text
621    ///      ┌─────┐                ┌─────┐
622    ///      │  2  │                │  1  │
623    ///      │  3  │                │  2  │
624    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
625    ///      │  4  │                │  3  │
626    ///      │  2  │                │  4  │          │
627    ///      └─────┘                └─────┘               sorted output
628    ///      ┌─────┐                ┌─────┐          ▼       stream
629    ///      │  1  │                │  1  │
630    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
631    ///      │  1  │                │  4  │
632    ///      └─────┘                └─────┘          ▲
633    ///        ...       ...         ...             │
634    ///
635    ///      ┌─────┐                ┌─────┐          │
636    ///      │  4  │                │  3  │
637    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
638    ///      └─────┘                └─────┘
639    ///
640    ///   in_mem_batches
641    /// ```
642    fn in_mem_sort_stream(
643        &mut self,
644        metrics: BaselineMetrics,
645    ) -> Result<SendableRecordBatchStream> {
646        if self.in_mem_batches.is_empty() {
647            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
648                &self.schema,
649            ))));
650        }
651
652        // The elapsed compute timer is updated when the value is dropped.
653        // There is no need for an explicit call to drop.
654        let elapsed_compute = metrics.elapsed_compute().clone();
655        let _timer = elapsed_compute.timer();
656
657        // Please pay attention that any operation inside of `in_mem_sort_stream` will
658        // not perform any memory reservation. This is for avoiding the need of handling
659        // reservation failure and spilling in the middle of the sort/merge. The memory
660        // space for batches produced by the resulting stream will be reserved by the
661        // consumer of the stream.
662
663        if self.in_mem_batches.len() == 1 {
664            let batch = self.in_mem_batches.swap_remove(0);
665            let reservation = self.reservation.take();
666            return self.sort_batch_stream(batch, metrics, reservation, true);
667        }
668
669        // If less than sort_in_place_threshold_bytes, concatenate and sort in place
670        if self.reservation.size() < self.sort_in_place_threshold_bytes {
671            // Concatenate memory batches together and sort
672            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
673            self.in_mem_batches.clear();
674            self.reservation
675                .try_resize(get_reserved_byte_for_record_batch(&batch))
676                .map_err(Self::err_with_oom_context)?;
677            let reservation = self.reservation.take();
678            return self.sort_batch_stream(batch, metrics, reservation, true);
679        }
680
681        let streams = std::mem::take(&mut self.in_mem_batches)
682            .into_iter()
683            .map(|batch| {
684                let metrics = self.metrics.baseline.intermediate();
685                let reservation = self
686                    .reservation
687                    .split(get_reserved_byte_for_record_batch(&batch));
688                let input = self.sort_batch_stream(
689                    batch,
690                    metrics,
691                    reservation,
692                    // Passing false as `StreamingMergeBuilder` will split the
693                    // stream into batches of `self.batch_size` rows.
694                    false,
695                )?;
696                Ok(spawn_buffered(input, 1))
697            })
698            .collect::<Result<_>>()?;
699
700        StreamingMergeBuilder::new()
701            .with_streams(streams)
702            .with_schema(Arc::clone(&self.schema))
703            .with_expressions(&self.expr.clone())
704            .with_metrics(metrics)
705            .with_batch_size(self.batch_size)
706            .with_fetch(None)
707            .with_reservation(self.merge_reservation.new_empty())
708            .build()
709    }
710
711    /// Sorts a single `RecordBatch` into a single stream.
712    ///
713    /// `reservation` accounts for the memory used by this batch and
714    /// is released when the sort is complete
715    ///
716    /// passing `split` true will return a [`BatchSplitStream`] where each batch maximum row count
717    /// will be `self.batch_size`.
718    /// If `split` is false, the stream will return a single batch
719    fn sort_batch_stream(
720        &self,
721        batch: RecordBatch,
722        metrics: BaselineMetrics,
723        reservation: MemoryReservation,
724        mut split: bool,
725    ) -> Result<SendableRecordBatchStream> {
726        assert_eq!(
727            get_reserved_byte_for_record_batch(&batch),
728            reservation.size()
729        );
730
731        split = split && batch.num_rows() > self.batch_size;
732
733        let schema = batch.schema();
734
735        let expressions = self.expr.clone();
736        let stream = futures::stream::once(async move {
737            let _timer = metrics.elapsed_compute().timer();
738
739            let sorted = sort_batch(&batch, &expressions, None)?;
740
741            metrics.record_output(sorted.num_rows());
742            drop(batch);
743            drop(reservation);
744            Ok(sorted)
745        });
746
747        let mut output: SendableRecordBatchStream =
748            Box::pin(RecordBatchStreamAdapter::new(schema, stream));
749
750        if split {
751            output = Box::pin(BatchSplitStream::new(
752                output,
753                self.batch_size,
754                self.metrics.split_metrics.clone(),
755            ));
756        }
757
758        Ok(output)
759    }
760
761    /// If this sort may spill, pre-allocates
762    /// `sort_spill_reservation_bytes` of memory to guarantee memory
763    /// left for the in memory sort/merge.
764    fn reserve_memory_for_merge(&mut self) -> Result<()> {
765        // Reserve headroom for next merge sort
766        if self.runtime.disk_manager.tmp_files_enabled() {
767            let size = self.sort_spill_reservation_bytes;
768            if self.merge_reservation.size() != size {
769                self.merge_reservation
770                    .try_resize(size)
771                    .map_err(Self::err_with_oom_context)?;
772            }
773        }
774
775        Ok(())
776    }
777
778    /// Reserves memory to be able to accommodate the given batch.
779    /// If memory is scarce, tries to spill current in-memory batches to disk first.
780    async fn reserve_memory_for_batch_and_maybe_spill(
781        &mut self,
782        input: &RecordBatch,
783    ) -> Result<()> {
784        let size = get_reserved_byte_for_record_batch(input);
785
786        match self.reservation.try_grow(size) {
787            Ok(_) => Ok(()),
788            Err(e) => {
789                if self.in_mem_batches.is_empty() {
790                    return Err(Self::err_with_oom_context(e));
791                }
792
793                // Spill and try again.
794                self.sort_and_spill_in_mem_batches().await?;
795                self.reservation
796                    .try_grow(size)
797                    .map_err(Self::err_with_oom_context)
798            }
799        }
800    }
801
802    /// Wraps the error with a context message suggesting settings to tweak.
803    /// This is meant to be used with DataFusionError::ResourcesExhausted only.
804    fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
805        match e {
806            DataFusionError::ResourcesExhausted(_) => e.context(
807                "Not enough memory to continue external sort. \
808                    Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
809            ),
810            // This is not an OOM error, so just return it as is.
811            _ => e,
812        }
813    }
814}
815
816/// Estimate how much memory is needed to sort a `RecordBatch`.
817///
818/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
819/// creating sorted copies of sorted columns in record batches for speeding up comparison
820/// in sorting and merging. The sorted copies are in either row format or array format.
821/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
822/// sorted copies are, they will use more memory than the original record batch.
823pub(crate) fn get_reserved_byte_for_record_batch_size(record_batch_size: usize) -> usize {
824    // 2x may not be enough for some cases, but it's a good start.
825    // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
826    // to compensate for the extra memory needed.
827    record_batch_size * 2
828}
829
830/// Estimate how much memory is needed to sort a `RecordBatch`.
831fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
832    get_reserved_byte_for_record_batch_size(get_record_batch_memory_size(batch))
833}
834
835impl Debug for ExternalSorter {
836    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
837        f.debug_struct("ExternalSorter")
838            .field("memory_used", &self.used())
839            .field("spilled_bytes", &self.spilled_bytes())
840            .field("spilled_rows", &self.spilled_rows())
841            .field("spill_count", &self.spill_count())
842            .finish()
843    }
844}
845
846pub fn sort_batch(
847    batch: &RecordBatch,
848    expressions: &LexOrdering,
849    fetch: Option<usize>,
850) -> Result<RecordBatch> {
851    let sort_columns = expressions
852        .iter()
853        .map(|expr| expr.evaluate_to_sort_column(batch))
854        .collect::<Result<Vec<_>>>()?;
855
856    let indices = lexsort_to_indices(&sort_columns, fetch)?;
857    let mut columns = take_arrays(batch.columns(), &indices, None)?;
858
859    // The columns may be larger than the unsorted columns in `batch` especially for variable length
860    // data types due to exponential growth when building the sort columns. We shrink the columns
861    // to prevent memory reservation failures, as well as excessive memory allocation when running
862    // merges in `SortPreservingMergeStream`.
863    columns.iter_mut().for_each(|c| {
864        c.shrink_to_fit();
865    });
866
867    let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
868    Ok(RecordBatch::try_new_with_options(
869        batch.schema(),
870        columns,
871        &options,
872    )?)
873}
874
875/// Sort execution plan.
876///
877/// Support sorting datasets that are larger than the memory allotted
878/// by the memory manager, by spilling to disk.
879#[derive(Debug, Clone)]
880pub struct SortExec {
881    /// Input schema
882    pub(crate) input: Arc<dyn ExecutionPlan>,
883    /// Sort expressions
884    expr: LexOrdering,
885    /// Containing all metrics set created during sort
886    metrics_set: ExecutionPlanMetricsSet,
887    /// Preserve partitions of input plan. If false, the input partitions
888    /// will be sorted and merged into a single output partition.
889    preserve_partitioning: bool,
890    /// Fetch highest/lowest n results
891    fetch: Option<usize>,
892    /// Normalized common sort prefix between the input and the sort expressions (only used with fetch)
893    common_sort_prefix: Vec<PhysicalSortExpr>,
894    /// Cache holding plan properties like equivalences, output partitioning etc.
895    cache: PlanProperties,
896    /// Filter matching the state of the sort for dynamic filter pushdown.
897    /// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
898    /// If `fetch` is `None`, this will be `None`.
899    filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
900}
901
902impl SortExec {
903    /// Create a new sort execution plan that produces a single,
904    /// sorted output partition.
905    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
906        let preserve_partitioning = false;
907        let (cache, sort_prefix) =
908            Self::compute_properties(&input, expr.clone(), preserve_partitioning)
909                .unwrap();
910        Self {
911            expr,
912            input,
913            metrics_set: ExecutionPlanMetricsSet::new(),
914            preserve_partitioning,
915            fetch: None,
916            common_sort_prefix: sort_prefix,
917            cache,
918            filter: None,
919        }
920    }
921
922    /// Whether this `SortExec` preserves partitioning of the children
923    pub fn preserve_partitioning(&self) -> bool {
924        self.preserve_partitioning
925    }
926
927    /// Specify the partitioning behavior of this sort exec
928    ///
929    /// If `preserve_partitioning` is true, sorts each partition
930    /// individually, producing one sorted stream for each input partition.
931    ///
932    /// If `preserve_partitioning` is false, sorts and merges all
933    /// input partitions producing a single, sorted partition.
934    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
935        self.preserve_partitioning = preserve_partitioning;
936        self.cache = self
937            .cache
938            .with_partitioning(Self::output_partitioning_helper(
939                &self.input,
940                self.preserve_partitioning,
941            ));
942        self
943    }
944
945    /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
946    fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
947        let children = self
948            .expr
949            .iter()
950            .map(|sort_expr| Arc::clone(&sort_expr.expr))
951            .collect::<Vec<_>>();
952        Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
953            DynamicFilterPhysicalExpr::new(children, lit(true)),
954        ))))
955    }
956
957    fn cloned(&self) -> Self {
958        SortExec {
959            input: Arc::clone(&self.input),
960            expr: self.expr.clone(),
961            metrics_set: self.metrics_set.clone(),
962            preserve_partitioning: self.preserve_partitioning,
963            common_sort_prefix: self.common_sort_prefix.clone(),
964            fetch: self.fetch,
965            cache: self.cache.clone(),
966            filter: self.filter.clone(),
967        }
968    }
969
970    /// Modify how many rows to include in the result
971    ///
972    /// If None, then all rows will be returned, in sorted order.
973    /// If Some, then only the top `fetch` rows will be returned.
974    /// This can reduce the memory pressure required by the sort
975    /// operation since rows that are not going to be included
976    /// can be dropped.
977    pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
978        let mut cache = self.cache.clone();
979        // If the SortExec can emit incrementally (that means the sort requirements
980        // and properties of the input match), the SortExec can generate its result
981        // without scanning the entire input when a fetch value exists.
982        let is_pipeline_friendly = matches!(
983            self.cache.emission_type,
984            EmissionType::Incremental | EmissionType::Both
985        );
986        if fetch.is_some() && is_pipeline_friendly {
987            cache = cache.with_boundedness(Boundedness::Bounded);
988        }
989        let filter = fetch.is_some().then(|| {
990            // If we already have a filter, keep it. Otherwise, create a new one.
991            self.filter.clone().unwrap_or_else(|| self.create_filter())
992        });
993        let mut new_sort = self.cloned();
994        new_sort.fetch = fetch;
995        new_sort.cache = cache;
996        new_sort.filter = filter;
997        new_sort
998    }
999
1000    /// Input schema
1001    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1002        &self.input
1003    }
1004
1005    /// Sort expressions
1006    pub fn expr(&self) -> &LexOrdering {
1007        &self.expr
1008    }
1009
1010    /// If `Some(fetch)`, limits output to only the first "fetch" items
1011    pub fn fetch(&self) -> Option<usize> {
1012        self.fetch
1013    }
1014
1015    fn output_partitioning_helper(
1016        input: &Arc<dyn ExecutionPlan>,
1017        preserve_partitioning: bool,
1018    ) -> Partitioning {
1019        // Get output partitioning:
1020        if preserve_partitioning {
1021            input.output_partitioning().clone()
1022        } else {
1023            Partitioning::UnknownPartitioning(1)
1024        }
1025    }
1026
1027    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
1028    /// It also returns the common sort prefix between the input and the sort expressions.
1029    fn compute_properties(
1030        input: &Arc<dyn ExecutionPlan>,
1031        sort_exprs: LexOrdering,
1032        preserve_partitioning: bool,
1033    ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1034        let (sort_prefix, sort_satisfied) = input
1035            .equivalence_properties()
1036            .extract_common_sort_prefix(sort_exprs.clone())?;
1037
1038        // The emission type depends on whether the input is already sorted:
1039        // - If already fully sorted, we can emit results in the same way as the input
1040        // - If not sorted, we must wait until all data is processed to emit results (Final)
1041        let emission_type = if sort_satisfied {
1042            input.pipeline_behavior()
1043        } else {
1044            EmissionType::Final
1045        };
1046
1047        // The boundedness depends on whether the input is already sorted:
1048        // - If already sorted, we have the same property as the input
1049        // - If not sorted and input is unbounded, we require infinite memory and generates
1050        //   unbounded data (not practical).
1051        // - If not sorted and input is bounded, then the SortExec is bounded, too.
1052        let boundedness = if sort_satisfied {
1053            input.boundedness()
1054        } else {
1055            match input.boundedness() {
1056                Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1057                    requires_infinite_memory: true,
1058                },
1059                bounded => bounded,
1060            }
1061        };
1062
1063        // Calculate equivalence properties; i.e. reset the ordering equivalence
1064        // class with the new ordering:
1065        let mut eq_properties = input.equivalence_properties().clone();
1066        eq_properties.reorder(sort_exprs)?;
1067
1068        // Get output partitioning:
1069        let output_partitioning =
1070            Self::output_partitioning_helper(input, preserve_partitioning);
1071
1072        Ok((
1073            PlanProperties::new(
1074                eq_properties,
1075                output_partitioning,
1076                emission_type,
1077                boundedness,
1078            ),
1079            sort_prefix,
1080        ))
1081    }
1082}
1083
1084impl DisplayAs for SortExec {
1085    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1086        match t {
1087            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1088                let preserve_partitioning = self.preserve_partitioning;
1089                match self.fetch {
1090                    Some(fetch) => {
1091                        write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
1092                        if let Some(filter) = &self.filter {
1093                            if let Ok(current) = filter.read().expr().current() {
1094                                if !current.eq(&lit(true)) {
1095                                    write!(f, ", filter=[{current}]")?;
1096                                }
1097                            }
1098                        }
1099                        if !self.common_sort_prefix.is_empty() {
1100                            write!(f, ", sort_prefix=[")?;
1101                            let mut first = true;
1102                            for sort_expr in &self.common_sort_prefix {
1103                                if first {
1104                                    first = false;
1105                                } else {
1106                                    write!(f, ", ")?;
1107                                }
1108                                write!(f, "{sort_expr}")?;
1109                            }
1110                            write!(f, "]")
1111                        } else {
1112                            Ok(())
1113                        }
1114                    }
1115                    None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
1116                }
1117            }
1118            DisplayFormatType::TreeRender => match self.fetch {
1119                Some(fetch) => {
1120                    writeln!(f, "{}", self.expr)?;
1121                    writeln!(f, "limit={fetch}")
1122                }
1123                None => {
1124                    writeln!(f, "{}", self.expr)
1125                }
1126            },
1127        }
1128    }
1129}
1130
1131impl ExecutionPlan for SortExec {
1132    fn name(&self) -> &'static str {
1133        match self.fetch {
1134            Some(_) => "SortExec(TopK)",
1135            None => "SortExec",
1136        }
1137    }
1138
1139    fn as_any(&self) -> &dyn Any {
1140        self
1141    }
1142
1143    fn properties(&self) -> &PlanProperties {
1144        &self.cache
1145    }
1146
1147    fn required_input_distribution(&self) -> Vec<Distribution> {
1148        if self.preserve_partitioning {
1149            vec![Distribution::UnspecifiedDistribution]
1150        } else {
1151            // global sort
1152            // TODO support RangePartition and OrderedDistribution
1153            vec![Distribution::SinglePartition]
1154        }
1155    }
1156
1157    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1158        vec![&self.input]
1159    }
1160
1161    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1162        vec![false]
1163    }
1164
1165    fn with_new_children(
1166        self: Arc<Self>,
1167        children: Vec<Arc<dyn ExecutionPlan>>,
1168    ) -> Result<Arc<dyn ExecutionPlan>> {
1169        let mut new_sort = self.cloned();
1170        assert!(
1171            children.len() == 1,
1172            "SortExec should have exactly one child"
1173        );
1174        new_sort.input = Arc::clone(&children[0]);
1175        // Recompute the properties based on the new input since they may have changed
1176        let (cache, sort_prefix) = Self::compute_properties(
1177            &new_sort.input,
1178            new_sort.expr.clone(),
1179            new_sort.preserve_partitioning,
1180        )?;
1181        new_sort.cache = cache;
1182        new_sort.common_sort_prefix = sort_prefix;
1183
1184        Ok(Arc::new(new_sort))
1185    }
1186
1187    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1188        let children = self.children().into_iter().cloned().collect();
1189        let new_sort = self.with_new_children(children)?;
1190        let mut new_sort = new_sort
1191            .as_any()
1192            .downcast_ref::<SortExec>()
1193            .expect("cloned 1 lines above this line, we know the type")
1194            .clone();
1195        // Our dynamic filter and execution metrics are the state we need to reset.
1196        new_sort.filter = Some(new_sort.create_filter());
1197        new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1198
1199        Ok(Arc::new(new_sort))
1200    }
1201
1202    fn execute(
1203        &self,
1204        partition: usize,
1205        context: Arc<TaskContext>,
1206    ) -> Result<SendableRecordBatchStream> {
1207        trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
1208
1209        let mut input = self.input.execute(partition, Arc::clone(&context))?;
1210
1211        let execution_options = &context.session_config().options().execution;
1212
1213        trace!("End SortExec's input.execute for partition: {partition}");
1214
1215        let sort_satisfied = self
1216            .input
1217            .equivalence_properties()
1218            .ordering_satisfy(self.expr.clone())?;
1219
1220        match (sort_satisfied, self.fetch.as_ref()) {
1221            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1222                input,
1223                0,
1224                Some(*fetch),
1225                BaselineMetrics::new(&self.metrics_set, partition),
1226            ))),
1227            (true, None) => Ok(input),
1228            (false, Some(fetch)) => {
1229                let filter = self.filter.clone();
1230                let mut topk = TopK::try_new(
1231                    partition,
1232                    input.schema(),
1233                    self.common_sort_prefix.clone(),
1234                    self.expr.clone(),
1235                    *fetch,
1236                    context.session_config().batch_size(),
1237                    context.runtime_env(),
1238                    &self.metrics_set,
1239                    Arc::clone(&unwrap_or_internal_err!(filter)),
1240                )?;
1241                Ok(Box::pin(RecordBatchStreamAdapter::new(
1242                    self.schema(),
1243                    futures::stream::once(async move {
1244                        while let Some(batch) = input.next().await {
1245                            let batch = batch?;
1246                            topk.insert_batch(batch)?;
1247                            if topk.finished {
1248                                break;
1249                            }
1250                        }
1251                        topk.emit()
1252                    })
1253                    .try_flatten(),
1254                )))
1255            }
1256            (false, None) => {
1257                let mut sorter = ExternalSorter::new(
1258                    partition,
1259                    input.schema(),
1260                    self.expr.clone(),
1261                    context.session_config().batch_size(),
1262                    execution_options.sort_spill_reservation_bytes,
1263                    execution_options.sort_in_place_threshold_bytes,
1264                    context.session_config().spill_compression(),
1265                    &self.metrics_set,
1266                    context.runtime_env(),
1267                )?;
1268                Ok(Box::pin(RecordBatchStreamAdapter::new(
1269                    self.schema(),
1270                    futures::stream::once(async move {
1271                        while let Some(batch) = input.next().await {
1272                            let batch = batch?;
1273                            sorter.insert_batch(batch).await?;
1274                        }
1275                        sorter.sort().await
1276                    })
1277                    .try_flatten(),
1278                )))
1279            }
1280        }
1281    }
1282
1283    fn metrics(&self) -> Option<MetricsSet> {
1284        Some(self.metrics_set.clone_inner())
1285    }
1286
1287    fn statistics(&self) -> Result<Statistics> {
1288        self.partition_statistics(None)
1289    }
1290
1291    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1292        if !self.preserve_partitioning() {
1293            return self
1294                .input
1295                .partition_statistics(None)?
1296                .with_fetch(self.fetch, 0, 1);
1297        }
1298        self.input
1299            .partition_statistics(partition)?
1300            .with_fetch(self.fetch, 0, 1)
1301    }
1302
1303    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1304        Some(Arc::new(SortExec::with_fetch(self, limit)))
1305    }
1306
1307    fn fetch(&self) -> Option<usize> {
1308        self.fetch
1309    }
1310
1311    fn cardinality_effect(&self) -> CardinalityEffect {
1312        if self.fetch.is_none() {
1313            CardinalityEffect::Equal
1314        } else {
1315            CardinalityEffect::LowerEqual
1316        }
1317    }
1318
1319    /// Tries to swap the projection with its input [`SortExec`]. If it can be done,
1320    /// it returns the new swapped version having the [`SortExec`] as the top plan.
1321    /// Otherwise, it returns None.
1322    fn try_swapping_with_projection(
1323        &self,
1324        projection: &ProjectionExec,
1325    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1326        // If the projection does not narrow the schema, we should not try to push it down.
1327        if projection.expr().len() >= projection.input().schema().fields().len() {
1328            return Ok(None);
1329        }
1330
1331        let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1332        else {
1333            return Ok(None);
1334        };
1335
1336        Ok(Some(Arc::new(
1337            SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1338                .with_fetch(self.fetch())
1339                .with_preserve_partitioning(self.preserve_partitioning()),
1340        )))
1341    }
1342
1343    fn gather_filters_for_pushdown(
1344        &self,
1345        phase: FilterPushdownPhase,
1346        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1347        config: &datafusion_common::config::ConfigOptions,
1348    ) -> Result<FilterDescription> {
1349        if !matches!(phase, FilterPushdownPhase::Post) {
1350            return FilterDescription::from_children(parent_filters, &self.children());
1351        }
1352
1353        let mut child =
1354            ChildFilterDescription::from_child(&parent_filters, self.input())?;
1355
1356        if let Some(filter) = &self.filter {
1357            if config.optimizer.enable_topk_dynamic_filter_pushdown {
1358                child = child.with_self_filter(filter.read().expr());
1359            }
1360        }
1361
1362        Ok(FilterDescription::new().with_child(child))
1363    }
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368    use std::collections::HashMap;
1369    use std::pin::Pin;
1370    use std::task::{Context, Poll};
1371
1372    use super::*;
1373    use crate::coalesce_partitions::CoalescePartitionsExec;
1374    use crate::collect;
1375    use crate::execution_plan::Boundedness;
1376    use crate::expressions::col;
1377    use crate::test;
1378    use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1379    use crate::test::TestMemoryExec;
1380    use crate::test::{assert_is_pending, make_partition};
1381
1382    use arrow::array::*;
1383    use arrow::compute::SortOptions;
1384    use arrow::datatypes::*;
1385    use datafusion_common::cast::as_primitive_array;
1386    use datafusion_common::test_util::batches_to_string;
1387    use datafusion_common::{DataFusionError, Result, ScalarValue};
1388    use datafusion_execution::config::SessionConfig;
1389    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1390    use datafusion_execution::RecordBatchStream;
1391    use datafusion_physical_expr::expressions::{Column, Literal};
1392    use datafusion_physical_expr::EquivalenceProperties;
1393
1394    use futures::{FutureExt, Stream};
1395    use insta::assert_snapshot;
1396
1397    #[derive(Debug, Clone)]
1398    pub struct SortedUnboundedExec {
1399        schema: Schema,
1400        batch_size: u64,
1401        cache: PlanProperties,
1402    }
1403
1404    impl DisplayAs for SortedUnboundedExec {
1405        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1406            match t {
1407                DisplayFormatType::Default
1408                | DisplayFormatType::Verbose
1409                | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1410            }
1411            Ok(())
1412        }
1413    }
1414
1415    impl SortedUnboundedExec {
1416        fn compute_properties(schema: SchemaRef) -> PlanProperties {
1417            let mut eq_properties = EquivalenceProperties::new(schema);
1418            eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1419                Column::new("c1", 0),
1420            ))]);
1421            PlanProperties::new(
1422                eq_properties,
1423                Partitioning::UnknownPartitioning(1),
1424                EmissionType::Final,
1425                Boundedness::Unbounded {
1426                    requires_infinite_memory: false,
1427                },
1428            )
1429        }
1430    }
1431
1432    impl ExecutionPlan for SortedUnboundedExec {
1433        fn name(&self) -> &'static str {
1434            Self::static_name()
1435        }
1436
1437        fn as_any(&self) -> &dyn Any {
1438            self
1439        }
1440
1441        fn properties(&self) -> &PlanProperties {
1442            &self.cache
1443        }
1444
1445        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1446            vec![]
1447        }
1448
1449        fn with_new_children(
1450            self: Arc<Self>,
1451            _: Vec<Arc<dyn ExecutionPlan>>,
1452        ) -> Result<Arc<dyn ExecutionPlan>> {
1453            Ok(self)
1454        }
1455
1456        fn execute(
1457            &self,
1458            _partition: usize,
1459            _context: Arc<TaskContext>,
1460        ) -> Result<SendableRecordBatchStream> {
1461            Ok(Box::pin(SortedUnboundedStream {
1462                schema: Arc::new(self.schema.clone()),
1463                batch_size: self.batch_size,
1464                offset: 0,
1465            }))
1466        }
1467    }
1468
1469    #[derive(Debug)]
1470    pub struct SortedUnboundedStream {
1471        schema: SchemaRef,
1472        batch_size: u64,
1473        offset: u64,
1474    }
1475
1476    impl Stream for SortedUnboundedStream {
1477        type Item = Result<RecordBatch>;
1478
1479        fn poll_next(
1480            mut self: Pin<&mut Self>,
1481            _cx: &mut Context<'_>,
1482        ) -> Poll<Option<Self::Item>> {
1483            let batch = SortedUnboundedStream::create_record_batch(
1484                Arc::clone(&self.schema),
1485                self.offset,
1486                self.batch_size,
1487            );
1488            self.offset += self.batch_size;
1489            Poll::Ready(Some(Ok(batch)))
1490        }
1491    }
1492
1493    impl RecordBatchStream for SortedUnboundedStream {
1494        fn schema(&self) -> SchemaRef {
1495            Arc::clone(&self.schema)
1496        }
1497    }
1498
1499    impl SortedUnboundedStream {
1500        fn create_record_batch(
1501            schema: SchemaRef,
1502            offset: u64,
1503            batch_size: u64,
1504        ) -> RecordBatch {
1505            let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1506            let array = UInt64Array::from(values);
1507            let array_ref: ArrayRef = Arc::new(array);
1508            RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1509        }
1510    }
1511
1512    #[tokio::test]
1513    async fn test_in_mem_sort() -> Result<()> {
1514        let task_ctx = Arc::new(TaskContext::default());
1515        let partitions = 4;
1516        let csv = test::scan_partitioned(partitions);
1517        let schema = csv.schema();
1518
1519        let sort_exec = Arc::new(SortExec::new(
1520            [PhysicalSortExpr {
1521                expr: col("i", &schema)?,
1522                options: SortOptions::default(),
1523            }]
1524            .into(),
1525            Arc::new(CoalescePartitionsExec::new(csv)),
1526        ));
1527
1528        let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1529
1530        assert_eq!(result.len(), 1);
1531        assert_eq!(result[0].num_rows(), 400);
1532        assert_eq!(
1533            task_ctx.runtime_env().memory_pool.reserved(),
1534            0,
1535            "The sort should have returned all memory used back to the memory manager"
1536        );
1537
1538        Ok(())
1539    }
1540
1541    #[tokio::test]
1542    async fn test_sort_spill() -> Result<()> {
1543        // trigger spill w/ 100 batches
1544        let session_config = SessionConfig::new();
1545        let sort_spill_reservation_bytes = session_config
1546            .options()
1547            .execution
1548            .sort_spill_reservation_bytes;
1549        let runtime = RuntimeEnvBuilder::new()
1550            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1551            .build_arc()?;
1552        let task_ctx = Arc::new(
1553            TaskContext::default()
1554                .with_session_config(session_config)
1555                .with_runtime(runtime),
1556        );
1557
1558        // The input has 100 partitions, each partition has a batch containing 100 rows.
1559        // Each row has a single Int32 column with values 0..100. The total size of the
1560        // input is roughly 40000 bytes.
1561        let partitions = 100;
1562        let input = test::scan_partitioned(partitions);
1563        let schema = input.schema();
1564
1565        let sort_exec = Arc::new(SortExec::new(
1566            [PhysicalSortExpr {
1567                expr: col("i", &schema)?,
1568                options: SortOptions::default(),
1569            }]
1570            .into(),
1571            Arc::new(CoalescePartitionsExec::new(input)),
1572        ));
1573
1574        let result = collect(
1575            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1576            Arc::clone(&task_ctx),
1577        )
1578        .await?;
1579
1580        assert_eq!(result.len(), 2);
1581
1582        // Now, validate metrics
1583        let metrics = sort_exec.metrics().unwrap();
1584
1585        assert_eq!(metrics.output_rows().unwrap(), 10000);
1586        assert!(metrics.elapsed_compute().unwrap() > 0);
1587
1588        let spill_count = metrics.spill_count().unwrap();
1589        let spilled_rows = metrics.spilled_rows().unwrap();
1590        let spilled_bytes = metrics.spilled_bytes().unwrap();
1591        // Processing 40000 bytes of data using 12288 bytes of memory requires 3 spills
1592        // unless we do something really clever. It will spill roughly 9000+ rows and 36000
1593        // bytes. We leave a little wiggle room for the actual numbers.
1594        assert!((3..=10).contains(&spill_count));
1595        assert!((9000..=10000).contains(&spilled_rows));
1596        assert!((38000..=44000).contains(&spilled_bytes));
1597
1598        let columns = result[0].columns();
1599
1600        let i = as_primitive_array::<Int32Type>(&columns[0])?;
1601        assert_eq!(i.value(0), 0);
1602        assert_eq!(i.value(i.len() - 1), 81);
1603        assert_eq!(
1604            task_ctx.runtime_env().memory_pool.reserved(),
1605            0,
1606            "The sort should have returned all memory used back to the memory manager"
1607        );
1608
1609        Ok(())
1610    }
1611
1612    #[tokio::test]
1613    async fn test_batch_reservation_error() -> Result<()> {
1614        // Pick a memory limit and sort_spill_reservation that make the first batch reservation fail.
1615        // These values assume that the ExternalSorter will reserve 800 bytes for the first batch.
1616        let expected_batch_reservation = 800;
1617        let merge_reservation: usize = 0; // Set to 0 for simplicity
1618        let memory_limit: usize = expected_batch_reservation + merge_reservation - 1; // Just short of what we need
1619
1620        let session_config =
1621            SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1622        let runtime = RuntimeEnvBuilder::new()
1623            .with_memory_limit(memory_limit, 1.0)
1624            .build_arc()?;
1625        let task_ctx = Arc::new(
1626            TaskContext::default()
1627                .with_session_config(session_config)
1628                .with_runtime(runtime),
1629        );
1630
1631        let plan = test::scan_partitioned(1);
1632
1633        // Read the first record batch to assert that our memory limit and sort_spill_reservation
1634        // settings trigger the test scenario.
1635        {
1636            let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1637            let first_batch = stream.next().await.unwrap()?;
1638            let batch_reservation = get_reserved_byte_for_record_batch(&first_batch);
1639
1640            assert_eq!(batch_reservation, expected_batch_reservation);
1641            assert!(memory_limit < (merge_reservation + batch_reservation));
1642        }
1643
1644        let sort_exec = Arc::new(SortExec::new(
1645            [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1646            plan,
1647        ));
1648
1649        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1650
1651        let err = result.unwrap_err();
1652        assert!(
1653            matches!(err, DataFusionError::Context(..)),
1654            "Assertion failed: expected a Context error, but got: {err:?}"
1655        );
1656
1657        // Assert that the context error is wrapping a resources exhausted error.
1658        assert!(
1659            matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1660            "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1661        );
1662
1663        Ok(())
1664    }
1665
1666    #[tokio::test]
1667    async fn test_sort_spill_utf8_strings() -> Result<()> {
1668        let session_config = SessionConfig::new()
1669            .with_batch_size(100)
1670            .with_sort_in_place_threshold_bytes(20 * 1024)
1671            .with_sort_spill_reservation_bytes(100 * 1024);
1672        let runtime = RuntimeEnvBuilder::new()
1673            .with_memory_limit(500 * 1024, 1.0)
1674            .build_arc()?;
1675        let task_ctx = Arc::new(
1676            TaskContext::default()
1677                .with_session_config(session_config)
1678                .with_runtime(runtime),
1679        );
1680
1681        // The input has 200 partitions, each partition has a batch containing 100 rows.
1682        // Each row has a single Utf8 column, the Utf8 string values are roughly 42 bytes.
1683        // The total size of the input is roughly 8.4 KB.
1684        let input = test::scan_partitioned_utf8(200);
1685        let schema = input.schema();
1686
1687        let sort_exec = Arc::new(SortExec::new(
1688            [PhysicalSortExpr {
1689                expr: col("i", &schema)?,
1690                options: SortOptions::default(),
1691            }]
1692            .into(),
1693            Arc::new(CoalescePartitionsExec::new(input)),
1694        ));
1695
1696        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1697
1698        let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1699        assert_eq!(num_rows, 20000);
1700
1701        // Now, validate metrics
1702        let metrics = sort_exec.metrics().unwrap();
1703
1704        assert_eq!(metrics.output_rows().unwrap(), 20000);
1705        assert!(metrics.elapsed_compute().unwrap() > 0);
1706
1707        let spill_count = metrics.spill_count().unwrap();
1708        let spilled_rows = metrics.spilled_rows().unwrap();
1709        let spilled_bytes = metrics.spilled_bytes().unwrap();
1710
1711        // This test case is processing 840KB of data using 400KB of memory. Note
1712        // that buffered batches can't be dropped until all sorted batches are
1713        // generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
1714        // batches.
1715        // The number of spills is roughly calculated as:
1716        //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1717
1718        // If this assertion fail with large spill count, make sure the following
1719        // case does not happen:
1720        // During external sorting, one sorted run should be spilled to disk in a
1721        // single file, due to memory limit we might need to append to the file
1722        // multiple times to spill all the data. Make sure we're not writing each
1723        // appending as a separate file.
1724        assert!((4..=8).contains(&spill_count));
1725        assert!((15000..=20000).contains(&spilled_rows));
1726        assert!((900000..=1000000).contains(&spilled_bytes));
1727
1728        // Verify that the result is sorted
1729        let concated_result = concat_batches(&schema, &result)?;
1730        let columns = concated_result.columns();
1731        let string_array = as_string_array(&columns[0]);
1732        for i in 0..string_array.len() - 1 {
1733            assert!(string_array.value(i) <= string_array.value(i + 1));
1734        }
1735
1736        assert_eq!(
1737            task_ctx.runtime_env().memory_pool.reserved(),
1738            0,
1739            "The sort should have returned all memory used back to the memory manager"
1740        );
1741
1742        Ok(())
1743    }
1744
1745    #[tokio::test]
1746    async fn test_sort_fetch_memory_calculation() -> Result<()> {
1747        // This test mirrors down the size from the example above.
1748        let avg_batch_size = 400;
1749        let partitions = 4;
1750
1751        // A tuple of (fetch, expect_spillage)
1752        let test_options = vec![
1753            // Since we don't have a limit (and the memory is less than the total size of
1754            // all the batches we are processing, we expect it to spill.
1755            (None, true),
1756            // When we have a limit however, the buffered size of batches should fit in memory
1757            // since it is much lower than the total size of the input batch.
1758            (Some(1), false),
1759        ];
1760
1761        for (fetch, expect_spillage) in test_options {
1762            let session_config = SessionConfig::new();
1763            let sort_spill_reservation_bytes = session_config
1764                .options()
1765                .execution
1766                .sort_spill_reservation_bytes;
1767
1768            let runtime = RuntimeEnvBuilder::new()
1769                .with_memory_limit(
1770                    sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1771                    1.0,
1772                )
1773                .build_arc()?;
1774            let task_ctx = Arc::new(
1775                TaskContext::default()
1776                    .with_runtime(runtime)
1777                    .with_session_config(session_config),
1778            );
1779
1780            let csv = test::scan_partitioned(partitions);
1781            let schema = csv.schema();
1782
1783            let sort_exec = Arc::new(
1784                SortExec::new(
1785                    [PhysicalSortExpr {
1786                        expr: col("i", &schema)?,
1787                        options: SortOptions::default(),
1788                    }]
1789                    .into(),
1790                    Arc::new(CoalescePartitionsExec::new(csv)),
1791                )
1792                .with_fetch(fetch),
1793            );
1794
1795            let result =
1796                collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1797            assert_eq!(result.len(), 1);
1798
1799            let metrics = sort_exec.metrics().unwrap();
1800            let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1801            assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1802        }
1803        Ok(())
1804    }
1805
1806    #[tokio::test]
1807    async fn test_sort_metadata() -> Result<()> {
1808        let task_ctx = Arc::new(TaskContext::default());
1809        let field_metadata: HashMap<String, String> =
1810            vec![("foo".to_string(), "bar".to_string())]
1811                .into_iter()
1812                .collect();
1813        let schema_metadata: HashMap<String, String> =
1814            vec![("baz".to_string(), "barf".to_string())]
1815                .into_iter()
1816                .collect();
1817
1818        let mut field = Field::new("field_name", DataType::UInt64, true);
1819        field.set_metadata(field_metadata.clone());
1820        let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1821        let schema = Arc::new(schema);
1822
1823        let data: ArrayRef =
1824            Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1825
1826        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1827        let input =
1828            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1829
1830        let sort_exec = Arc::new(SortExec::new(
1831            [PhysicalSortExpr {
1832                expr: col("field_name", &schema)?,
1833                options: SortOptions::default(),
1834            }]
1835            .into(),
1836            input,
1837        ));
1838
1839        let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1840
1841        let expected_data: ArrayRef =
1842            Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1843        let expected_batch =
1844            RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
1845
1846        // Data is correct
1847        assert_eq!(&vec![expected_batch], &result);
1848
1849        // explicitly ensure the metadata is present
1850        assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1851        assert_eq!(result[0].schema().metadata(), &schema_metadata);
1852
1853        Ok(())
1854    }
1855
1856    #[tokio::test]
1857    async fn test_lex_sort_by_mixed_types() -> Result<()> {
1858        let task_ctx = Arc::new(TaskContext::default());
1859        let schema = Arc::new(Schema::new(vec![
1860            Field::new("a", DataType::Int32, true),
1861            Field::new(
1862                "b",
1863                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1864                true,
1865            ),
1866        ]));
1867
1868        // define data.
1869        let batch = RecordBatch::try_new(
1870            Arc::clone(&schema),
1871            vec![
1872                Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
1873                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1874                    Some(vec![Some(3)]),
1875                    Some(vec![Some(1)]),
1876                    Some(vec![Some(6), None]),
1877                    Some(vec![Some(5)]),
1878                ])),
1879            ],
1880        )?;
1881
1882        let sort_exec = Arc::new(SortExec::new(
1883            [
1884                PhysicalSortExpr {
1885                    expr: col("a", &schema)?,
1886                    options: SortOptions {
1887                        descending: false,
1888                        nulls_first: true,
1889                    },
1890                },
1891                PhysicalSortExpr {
1892                    expr: col("b", &schema)?,
1893                    options: SortOptions {
1894                        descending: true,
1895                        nulls_first: false,
1896                    },
1897                },
1898            ]
1899            .into(),
1900            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
1901        ));
1902
1903        assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
1904        assert_eq!(
1905            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1906            *sort_exec.schema().field(1).data_type()
1907        );
1908
1909        let result: Vec<RecordBatch> =
1910            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1911        let metrics = sort_exec.metrics().unwrap();
1912        assert!(metrics.elapsed_compute().unwrap() > 0);
1913        assert_eq!(metrics.output_rows().unwrap(), 4);
1914        assert_eq!(result.len(), 1);
1915
1916        let expected = RecordBatch::try_new(
1917            schema,
1918            vec![
1919                Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
1920                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1921                    Some(vec![Some(1)]),
1922                    Some(vec![Some(6), None]),
1923                    Some(vec![Some(5)]),
1924                    Some(vec![Some(3)]),
1925                ])),
1926            ],
1927        )?;
1928
1929        assert_eq!(expected, result[0]);
1930
1931        Ok(())
1932    }
1933
1934    #[tokio::test]
1935    async fn test_lex_sort_by_float() -> Result<()> {
1936        let task_ctx = Arc::new(TaskContext::default());
1937        let schema = Arc::new(Schema::new(vec![
1938            Field::new("a", DataType::Float32, true),
1939            Field::new("b", DataType::Float64, true),
1940        ]));
1941
1942        // define data.
1943        let batch = RecordBatch::try_new(
1944            Arc::clone(&schema),
1945            vec![
1946                Arc::new(Float32Array::from(vec![
1947                    Some(f32::NAN),
1948                    None,
1949                    None,
1950                    Some(f32::NAN),
1951                    Some(1.0_f32),
1952                    Some(1.0_f32),
1953                    Some(2.0_f32),
1954                    Some(3.0_f32),
1955                ])),
1956                Arc::new(Float64Array::from(vec![
1957                    Some(200.0_f64),
1958                    Some(20.0_f64),
1959                    Some(10.0_f64),
1960                    Some(100.0_f64),
1961                    Some(f64::NAN),
1962                    None,
1963                    None,
1964                    Some(f64::NAN),
1965                ])),
1966            ],
1967        )?;
1968
1969        let sort_exec = Arc::new(SortExec::new(
1970            [
1971                PhysicalSortExpr {
1972                    expr: col("a", &schema)?,
1973                    options: SortOptions {
1974                        descending: true,
1975                        nulls_first: true,
1976                    },
1977                },
1978                PhysicalSortExpr {
1979                    expr: col("b", &schema)?,
1980                    options: SortOptions {
1981                        descending: false,
1982                        nulls_first: false,
1983                    },
1984                },
1985            ]
1986            .into(),
1987            TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
1988        ));
1989
1990        assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
1991        assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
1992
1993        let result: Vec<RecordBatch> =
1994            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1995        let metrics = sort_exec.metrics().unwrap();
1996        assert!(metrics.elapsed_compute().unwrap() > 0);
1997        assert_eq!(metrics.output_rows().unwrap(), 8);
1998        assert_eq!(result.len(), 1);
1999
2000        let columns = result[0].columns();
2001
2002        assert_eq!(DataType::Float32, *columns[0].data_type());
2003        assert_eq!(DataType::Float64, *columns[1].data_type());
2004
2005        let a = as_primitive_array::<Float32Type>(&columns[0])?;
2006        let b = as_primitive_array::<Float64Type>(&columns[1])?;
2007
2008        // convert result to strings to allow comparing to expected result containing NaN
2009        let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2010            .map(|i| {
2011                let aval = if a.is_valid(i) {
2012                    Some(a.value(i).to_string())
2013                } else {
2014                    None
2015                };
2016                let bval = if b.is_valid(i) {
2017                    Some(b.value(i).to_string())
2018                } else {
2019                    None
2020                };
2021                (aval, bval)
2022            })
2023            .collect();
2024
2025        let expected: Vec<(Option<String>, Option<String>)> = vec![
2026            (None, Some("10".to_owned())),
2027            (None, Some("20".to_owned())),
2028            (Some("NaN".to_owned()), Some("100".to_owned())),
2029            (Some("NaN".to_owned()), Some("200".to_owned())),
2030            (Some("3".to_owned()), Some("NaN".to_owned())),
2031            (Some("2".to_owned()), None),
2032            (Some("1".to_owned()), Some("NaN".to_owned())),
2033            (Some("1".to_owned()), None),
2034        ];
2035
2036        assert_eq!(expected, result);
2037
2038        Ok(())
2039    }
2040
2041    #[tokio::test]
2042    async fn test_drop_cancel() -> Result<()> {
2043        let task_ctx = Arc::new(TaskContext::default());
2044        let schema =
2045            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2046
2047        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2048        let refs = blocking_exec.refs();
2049        let sort_exec = Arc::new(SortExec::new(
2050            [PhysicalSortExpr {
2051                expr: col("a", &schema)?,
2052                options: SortOptions::default(),
2053            }]
2054            .into(),
2055            blocking_exec,
2056        ));
2057
2058        let fut = collect(sort_exec, Arc::clone(&task_ctx));
2059        let mut fut = fut.boxed();
2060
2061        assert_is_pending(&mut fut);
2062        drop(fut);
2063        assert_strong_count_converges_to_zero(refs).await;
2064
2065        assert_eq!(
2066            task_ctx.runtime_env().memory_pool.reserved(),
2067            0,
2068            "The sort should have returned all memory used back to the memory manager"
2069        );
2070
2071        Ok(())
2072    }
2073
2074    #[test]
2075    fn test_empty_sort_batch() {
2076        let schema = Arc::new(Schema::empty());
2077        let options = RecordBatchOptions::new().with_row_count(Some(1));
2078        let batch =
2079            RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2080                .unwrap();
2081
2082        let expressions = [PhysicalSortExpr {
2083            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2084            options: SortOptions::default(),
2085        }]
2086        .into();
2087
2088        let result = sort_batch(&batch, &expressions, None).unwrap();
2089        assert_eq!(result.num_rows(), 1);
2090    }
2091
2092    #[tokio::test]
2093    async fn topk_unbounded_source() -> Result<()> {
2094        let task_ctx = Arc::new(TaskContext::default());
2095        let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2096        let source = SortedUnboundedExec {
2097            schema: schema.clone(),
2098            batch_size: 2,
2099            cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
2100        };
2101        let mut plan = SortExec::new(
2102            [PhysicalSortExpr::new_default(Arc::new(Column::new(
2103                "c1", 0,
2104            )))]
2105            .into(),
2106            Arc::new(source),
2107        );
2108        plan = plan.with_fetch(Some(9));
2109
2110        let batches = collect(Arc::new(plan), task_ctx).await?;
2111        assert_snapshot!(batches_to_string(&batches), @r#"
2112            +----+
2113            | c1 |
2114            +----+
2115            | 0  |
2116            | 1  |
2117            | 2  |
2118            | 3  |
2119            | 4  |
2120            | 5  |
2121            | 6  |
2122            | 7  |
2123            | 8  |
2124            +----+
2125            "#);
2126        Ok(())
2127    }
2128
2129    #[tokio::test]
2130    async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2131        let batch_size = 100;
2132
2133        let create_task_ctx = |_: &[RecordBatch]| {
2134            TaskContext::default().with_session_config(
2135                SessionConfig::new()
2136                    .with_batch_size(batch_size)
2137                    .with_sort_in_place_threshold_bytes(usize::MAX),
2138            )
2139        };
2140
2141        // Smaller than batch size and require more than a single batch to get the requested batch size
2142        test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2143
2144        // Not evenly divisible by batch size
2145        test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2146
2147        // Evenly divisible by batch size and is larger than 2 output batches
2148        test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2149
2150        Ok(())
2151    }
2152
2153    #[tokio::test]
2154    async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place(
2155    ) -> Result<()> {
2156        let batch_size = 100;
2157
2158        let create_task_ctx = |_: &[RecordBatch]| {
2159            TaskContext::default().with_session_config(
2160                SessionConfig::new()
2161                    .with_batch_size(batch_size)
2162                    .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2163            )
2164        };
2165
2166        // Smaller than batch size and require more than a single batch to get the requested batch size
2167        {
2168            let metrics =
2169                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2170
2171            assert_eq!(
2172                metrics.spill_count(),
2173                Some(0),
2174                "Expected no spills when sorting in place"
2175            );
2176        }
2177
2178        // Not evenly divisible by batch size
2179        {
2180            let metrics =
2181                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2182
2183            assert_eq!(
2184                metrics.spill_count(),
2185                Some(0),
2186                "Expected no spills when sorting in place"
2187            );
2188        }
2189
2190        // Evenly divisible by batch size and is larger than 2 output batches
2191        {
2192            let metrics =
2193                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2194
2195            assert_eq!(
2196                metrics.spill_count(),
2197                Some(0),
2198                "Expected no spills when sorting in place"
2199            );
2200        }
2201
2202        Ok(())
2203    }
2204
2205    #[tokio::test]
2206    async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch(
2207    ) -> Result<()> {
2208        let batch_size = 100;
2209
2210        let create_task_ctx = |_: &[RecordBatch]| {
2211            TaskContext::default()
2212                .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2213        };
2214
2215        // Smaller than batch size and require more than a single batch to get the requested batch size
2216        {
2217            let metrics = test_sort_output_batch_size(
2218                // Single batch
2219                1,
2220                batch_size / 4,
2221                create_task_ctx,
2222            )
2223            .await?;
2224
2225            assert_eq!(
2226                metrics.spill_count(),
2227                Some(0),
2228                "Expected no spills when sorting in place"
2229            );
2230        }
2231
2232        // Not evenly divisible by batch size
2233        {
2234            let metrics = test_sort_output_batch_size(
2235                // Single batch
2236                1,
2237                batch_size + 7,
2238                create_task_ctx,
2239            )
2240            .await?;
2241
2242            assert_eq!(
2243                metrics.spill_count(),
2244                Some(0),
2245                "Expected no spills when sorting in place"
2246            );
2247        }
2248
2249        // Evenly divisible by batch size and is larger than 2 output batches
2250        {
2251            let metrics = test_sort_output_batch_size(
2252                // Single batch
2253                1,
2254                batch_size * 3,
2255                create_task_ctx,
2256            )
2257            .await?;
2258
2259            assert_eq!(
2260                metrics.spill_count(),
2261                Some(0),
2262                "Expected no spills when sorting in place"
2263            );
2264        }
2265
2266        Ok(())
2267    }
2268
2269    #[tokio::test]
2270    async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill(
2271    ) -> Result<()> {
2272        let batch_size = 100;
2273
2274        let create_task_ctx = |generated_batches: &[RecordBatch]| {
2275            let batches_memory = generated_batches
2276                .iter()
2277                .map(|b| b.get_array_memory_size())
2278                .sum::<usize>();
2279
2280            TaskContext::default()
2281                .with_session_config(
2282                    SessionConfig::new()
2283                        .with_batch_size(batch_size)
2284                        // To make sure there is no in place sorting
2285                        .with_sort_in_place_threshold_bytes(1)
2286                        .with_sort_spill_reservation_bytes(1),
2287                )
2288                .with_runtime(
2289                    RuntimeEnvBuilder::default()
2290                        .with_memory_limit(batches_memory, 1.0)
2291                        .build_arc()
2292                        .unwrap(),
2293                )
2294        };
2295
2296        // Smaller than batch size and require more than a single batch to get the requested batch size
2297        {
2298            let metrics =
2299                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2300
2301            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2302        }
2303
2304        // Not evenly divisible by batch size
2305        {
2306            let metrics =
2307                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2308
2309            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2310        }
2311
2312        // Evenly divisible by batch size and is larger than 2 batches
2313        {
2314            let metrics =
2315                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2316
2317            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2318        }
2319
2320        Ok(())
2321    }
2322
2323    async fn test_sort_output_batch_size(
2324        number_of_batches: usize,
2325        batch_size_to_generate: usize,
2326        create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2327    ) -> Result<MetricsSet> {
2328        let batches = (0..number_of_batches)
2329            .map(|_| make_partition(batch_size_to_generate as i32))
2330            .collect::<Vec<_>>();
2331        let task_ctx = create_task_ctx(batches.as_slice());
2332
2333        let expected_batch_size = task_ctx.session_config().batch_size();
2334
2335        let (mut output_batches, metrics) =
2336            run_sort_on_input(task_ctx, "i", batches).await?;
2337
2338        let last_batch = output_batches.pop().unwrap();
2339
2340        for batch in output_batches {
2341            assert_eq!(batch.num_rows(), expected_batch_size);
2342        }
2343
2344        let mut last_expected_batch_size =
2345            (batch_size_to_generate * number_of_batches) % expected_batch_size;
2346        if last_expected_batch_size == 0 {
2347            last_expected_batch_size = expected_batch_size;
2348        }
2349        assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2350
2351        Ok(metrics)
2352    }
2353
2354    async fn run_sort_on_input(
2355        task_ctx: TaskContext,
2356        order_by_col: &str,
2357        batches: Vec<RecordBatch>,
2358    ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2359        let task_ctx = Arc::new(task_ctx);
2360
2361        // let task_ctx = env.
2362        let schema = batches[0].schema();
2363        let ordering: LexOrdering = [PhysicalSortExpr {
2364            expr: col(order_by_col, &schema)?,
2365            options: SortOptions {
2366                descending: false,
2367                nulls_first: true,
2368            },
2369        }]
2370        .into();
2371        let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2372            ordering.clone(),
2373            TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2374        ));
2375
2376        let sorted_batches =
2377            collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2378
2379        let metrics = sort_exec.metrics().expect("sort have metrics");
2380
2381        // assert output
2382        {
2383            let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2384            let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2385
2386            let sorted_batches_concat =
2387                concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2388
2389            assert_eq!(sorted_input_batch, sorted_batches_concat);
2390        }
2391
2392        Ok((sorted_batches, metrics))
2393    }
2394}