datafusion_physical_plan/sorts/
sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort that deals with an arbitrary size of the input.
19//! It will do in-memory sorting if it has enough memory budget
20//! but spills to disk if needed.
21
22use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
31use crate::expressions::PhysicalSortExpr;
32use crate::filter_pushdown::{
33    ChildFilterDescription, FilterDescription, FilterPushdownPhase,
34};
35use crate::limit::LimitStream;
36use crate::metrics::{
37    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, SplitMetrics,
38};
39use crate::projection::{make_with_child, update_ordering, ProjectionExec};
40use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
41use crate::spill::get_record_batch_memory_size;
42use crate::spill::in_progress_spill_file::InProgressSpillFile;
43use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
44use crate::stream::BatchSplitStream;
45use crate::stream::RecordBatchStreamAdapter;
46use crate::topk::TopK;
47use crate::topk::TopKDynamicFilters;
48use crate::{
49    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
50    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
51    Statistics,
52};
53
54use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
55use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
56use arrow::datatypes::SchemaRef;
57use datafusion_common::config::SpillCompression;
58use datafusion_common::{
59    internal_datafusion_err, internal_err, unwrap_or_internal_err, DataFusionError,
60    Result,
61};
62use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
63use datafusion_execution::runtime_env::RuntimeEnv;
64use datafusion_execution::TaskContext;
65use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
66use datafusion_physical_expr::LexOrdering;
67use datafusion_physical_expr::PhysicalExpr;
68
69use futures::{StreamExt, TryStreamExt};
70use log::{debug, trace};
71
72struct ExternalSorterMetrics {
73    /// metrics
74    baseline: BaselineMetrics,
75
76    spill_metrics: SpillMetrics,
77
78    split_metrics: SplitMetrics,
79}
80
81impl ExternalSorterMetrics {
82    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
83        Self {
84            baseline: BaselineMetrics::new(metrics, partition),
85            spill_metrics: SpillMetrics::new(metrics, partition),
86            split_metrics: SplitMetrics::new(metrics, partition),
87        }
88    }
89}
90
91/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
92/// a total order. Depending on the input size and memory manager
93/// configuration, writes intermediate results to disk ("spills")
94/// using Arrow IPC format.
95///
96/// # Algorithm
97///
98/// 1. get a non-empty new batch from input
99///
100/// 2. check with the memory manager there is sufficient space to
101///    buffer the batch in memory.
102///
103/// 2.1 if memory is sufficient, buffer batch in memory, go to 1.
104///
105/// 2.2 if no more memory is available, sort all buffered batches and
106///     spill to file.  buffer the next batch in memory, go to 1.
107///
108/// 3. when input is exhausted, merge all in memory batches and spills
109///    to get a total order.
110///
111/// # When data fits in available memory
112///
113/// If there is sufficient memory, data is sorted in memory to produce the output
114///
115/// ```text
116///    ┌─────┐
117///    │  2  │
118///    │  3  │
119///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
120///    │  4  │
121///    │  2  │                  │
122///    └─────┘                  ▼
123///    ┌─────┐
124///    │  1  │              In memory
125///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
126///    │  1  │
127///    └─────┘                  ▲
128///      ...                    │
129///
130///    ┌─────┐                  │
131///    │  4  │
132///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
133///    └─────┘
134///
135/// in_mem_batches
136///
137/// ```
138///
139/// # When data does not fit in available memory
140///
141///  When memory is exhausted, data is first sorted and written to one
142///  or more spill files on disk:
143///
144/// ```text
145///    ┌─────┐                               .─────────────────.
146///    │  2  │                              (                   )
147///    │  3  │                              │`─────────────────'│
148///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
149///    │  4  │             │                │  │ 1  │░          │
150///    │  2  │                              │  │... │░          │
151///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
152///    ┌─────┐                              │  └────┘░    1  │░ │
153///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
154///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
155///    │  1  │     and write to file        │           │    ░░ │
156///    └─────┘                              │             4  │░ │
157///      ...               ▲                │           └░─░─░░ │
158///                        │                │            ░░░░░░ │
159///    ┌─────┐                              │.─────────────────.│
160///    │  4  │             │                (                   )
161///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
162///    └─────┘
163///
164/// in_mem_batches                                  spills
165///                                         (file on disk in Arrow
166///                                               IPC format)
167/// ```
168///
169/// Once the input is completely read, the spill files are read and
170/// merged with any in memory batches to produce a single total sorted
171/// output:
172///
173/// ```text
174///   .─────────────────.
175///  (                   )
176///  │`─────────────────'│
177///  │  ┌────┐           │
178///  │  │ 1  │░          │
179///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
180///  │  │ 4  │░ ┌────┐   │           │
181///  │  └────┘░ │ 1  │░  │           ▼
182///  │   ░░░░░░ │    │░  │
183///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
184///  │          │    │░  │
185///  │          │ 4  │░  │           ▲
186///  │          └────┘░  │           │
187///  │           ░░░░░░  │
188///  │.─────────────────.│           │
189///  (                   )
190///   `─────────────────'            │
191///         spills
192///                                  │
193///
194///                                  │
195///
196///     ┌─────┐                      │
197///     │  1  │
198///     │  4  │─ ─ ─ ─               │
199///     └─────┘       │
200///       ...                   In memory
201///                   └ ─ ─ ─▶  sort/merge
202///     ┌─────┐
203///     │  4  │                      ▲
204///     │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
205///     └─────┘
206///
207///  in_mem_batches
208/// ```
209struct ExternalSorter {
210    // ========================================================================
211    // PROPERTIES:
212    // Fields that define the sorter's configuration and remain constant
213    // ========================================================================
214    /// Schema of the output (and the input)
215    schema: SchemaRef,
216    /// Sort expressions
217    expr: LexOrdering,
218    /// The target number of rows for output batches
219    batch_size: usize,
220    /// If the in size of buffered memory batches is below this size,
221    /// the data will be concatenated and sorted in place rather than
222    /// sort/merged.
223    sort_in_place_threshold_bytes: usize,
224
225    // ========================================================================
226    // STATE BUFFERS:
227    // Fields that hold intermediate data during sorting
228    // ========================================================================
229    /// Unsorted input batches stored in the memory buffer
230    in_mem_batches: Vec<RecordBatch>,
231
232    /// During external sorting, in-memory intermediate data will be appended to
233    /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
234    ///
235    /// this is a tuple of:
236    /// 1. `InProgressSpillFile` - the file that is being written to
237    /// 2. `max_record_batch_memory` - the maximum memory usage of a single batch in this spill file.
238    in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
239    /// If data has previously been spilled, the locations of the spill files (in
240    /// Arrow IPC format)
241    /// Within the same spill file, the data might be chunked into multiple batches,
242    /// and ordered by sort keys.
243    finished_spill_files: Vec<SortedSpillFile>,
244
245    // ========================================================================
246    // EXECUTION RESOURCES:
247    // Fields related to managing execution resources and monitoring performance.
248    // ========================================================================
249    /// Runtime metrics
250    metrics: ExternalSorterMetrics,
251    /// A handle to the runtime to get spill files
252    runtime: Arc<RuntimeEnv>,
253    /// Reservation for in_mem_batches
254    reservation: MemoryReservation,
255    spill_manager: SpillManager,
256
257    /// Reservation for the merging of in-memory batches. If the sort
258    /// might spill, `sort_spill_reservation_bytes` will be
259    /// pre-reserved to ensure there is some space for this sort/merge.
260    merge_reservation: MemoryReservation,
261    /// How much memory to reserve for performing in-memory sort/merges
262    /// prior to spilling.
263    sort_spill_reservation_bytes: usize,
264}
265
266impl ExternalSorter {
267    // TODO: make a builder or some other nicer API to avoid the
268    // clippy warning
269    #[allow(clippy::too_many_arguments)]
270    pub fn new(
271        partition_id: usize,
272        schema: SchemaRef,
273        expr: LexOrdering,
274        batch_size: usize,
275        sort_spill_reservation_bytes: usize,
276        sort_in_place_threshold_bytes: usize,
277        // Configured via `datafusion.execution.spill_compression`.
278        spill_compression: SpillCompression,
279        metrics: &ExecutionPlanMetricsSet,
280        runtime: Arc<RuntimeEnv>,
281    ) -> Result<Self> {
282        let metrics = ExternalSorterMetrics::new(metrics, partition_id);
283        let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
284            .with_can_spill(true)
285            .register(&runtime.memory_pool);
286
287        let merge_reservation =
288            MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
289                .register(&runtime.memory_pool);
290
291        let spill_manager = SpillManager::new(
292            Arc::clone(&runtime),
293            metrics.spill_metrics.clone(),
294            Arc::clone(&schema),
295        )
296        .with_compression_type(spill_compression);
297
298        Ok(Self {
299            schema,
300            in_mem_batches: vec![],
301            in_progress_spill_file: None,
302            finished_spill_files: vec![],
303            expr,
304            metrics,
305            reservation,
306            spill_manager,
307            merge_reservation,
308            runtime,
309            batch_size,
310            sort_spill_reservation_bytes,
311            sort_in_place_threshold_bytes,
312        })
313    }
314
315    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
316    ///
317    /// Updates memory usage metrics, and possibly triggers spilling to disk
318    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
319        if input.num_rows() == 0 {
320            return Ok(());
321        }
322
323        self.reserve_memory_for_merge()?;
324        self.reserve_memory_for_batch_and_maybe_spill(&input)
325            .await?;
326
327        self.in_mem_batches.push(input);
328        Ok(())
329    }
330
331    fn spilled_before(&self) -> bool {
332        !self.finished_spill_files.is_empty()
333    }
334
335    /// Returns the final sorted output of all batches inserted via
336    /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
337    ///
338    /// This process could either be:
339    ///
340    /// 1. An in-memory sort/merge (if the input fit in memory)
341    ///
342    /// 2. A combined streaming merge incorporating both in-memory
343    ///    batches and data from spill files on disk.
344    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
345        // Release the memory reserved for merge back to the pool so
346        // there is some left when `in_mem_sort_stream` requests an
347        // allocation.
348        self.merge_reservation.free();
349
350        if self.spilled_before() {
351            // Sort `in_mem_batches` and spill it first. If there are many
352            // `in_mem_batches` and the memory limit is almost reached, merging
353            // them with the spilled files at the same time might cause OOM.
354            if !self.in_mem_batches.is_empty() {
355                self.sort_and_spill_in_mem_batches().await?;
356            }
357
358            StreamingMergeBuilder::new()
359                .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
360                .with_spill_manager(self.spill_manager.clone())
361                .with_schema(Arc::clone(&self.schema))
362                .with_expressions(&self.expr.clone())
363                .with_metrics(self.metrics.baseline.clone())
364                .with_batch_size(self.batch_size)
365                .with_fetch(None)
366                .with_reservation(self.merge_reservation.new_empty())
367                .build()
368        } else {
369            self.in_mem_sort_stream(self.metrics.baseline.clone())
370        }
371    }
372
373    /// How much memory is buffered in this `ExternalSorter`?
374    fn used(&self) -> usize {
375        self.reservation.size()
376    }
377
378    /// How many bytes have been spilled to disk?
379    fn spilled_bytes(&self) -> usize {
380        self.metrics.spill_metrics.spilled_bytes.value()
381    }
382
383    /// How many rows have been spilled to disk?
384    fn spilled_rows(&self) -> usize {
385        self.metrics.spill_metrics.spilled_rows.value()
386    }
387
388    /// How many spill files have been created?
389    fn spill_count(&self) -> usize {
390        self.metrics.spill_metrics.spill_file_count.value()
391    }
392
393    /// Appending globally sorted batches to the in-progress spill file, and clears
394    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
395    async fn consume_and_spill_append(
396        &mut self,
397        globally_sorted_batches: &mut Vec<RecordBatch>,
398    ) -> Result<()> {
399        if globally_sorted_batches.is_empty() {
400            return Ok(());
401        }
402
403        // Lazily initialize the in-progress spill file
404        if self.in_progress_spill_file.is_none() {
405            self.in_progress_spill_file =
406                Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
407        }
408
409        Self::organize_stringview_arrays(globally_sorted_batches)?;
410
411        debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
412
413        let batches_to_spill = std::mem::take(globally_sorted_batches);
414        self.reservation.free();
415
416        let (in_progress_file, max_record_batch_size) =
417            self.in_progress_spill_file.as_mut().ok_or_else(|| {
418                internal_datafusion_err!("In-progress spill file should be initialized")
419            })?;
420
421        for batch in batches_to_spill {
422            in_progress_file.append_batch(&batch)?;
423
424            *max_record_batch_size =
425                (*max_record_batch_size).max(batch.get_sliced_size()?);
426        }
427
428        if !globally_sorted_batches.is_empty() {
429            return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking.");
430        }
431
432        Ok(())
433    }
434
435    /// Finishes the in-progress spill file and moves it to the finished spill files.
436    async fn spill_finish(&mut self) -> Result<()> {
437        let (mut in_progress_file, max_record_batch_memory) =
438            self.in_progress_spill_file.take().ok_or_else(|| {
439                internal_datafusion_err!("Should be called after `spill_append`")
440            })?;
441        let spill_file = in_progress_file.finish()?;
442
443        if let Some(spill_file) = spill_file {
444            self.finished_spill_files.push(SortedSpillFile {
445                file: spill_file,
446                max_record_batch_memory,
447            });
448        }
449
450        Ok(())
451    }
452
453    /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
454    /// `StringViewArray` in sequential order by calling `gc()` on them.
455    ///
456    /// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
457    /// available
458    ///
459    /// # Rationale
460    /// After (merge-based) sorting, all batches will be sorted into a single run,
461    /// but physically this sorted run is chunked into many small batches. For
462    /// `StringViewArray`s inside each sorted run, their inner buffers are not
463    /// re-constructed by default, leading to non-sequential payload locations
464    /// (permutated by `interleave()` Arrow kernel). A single payload buffer might
465    /// be shared by multiple `RecordBatch`es.
466    /// When writing each batch to disk, the writer has to write all referenced buffers,
467    /// because they have to be read back one by one to reduce memory usage. This
468    /// causes extra disk reads and writes, and potentially execution failure.
469    ///
470    /// # Example
471    /// Before sorting:
472    /// batch1 -> buffer1
473    /// batch2 -> buffer2
474    ///
475    /// sorted_batch1 -> buffer1
476    ///               -> buffer2
477    /// sorted_batch2 -> buffer1
478    ///               -> buffer2
479    ///
480    /// Then when spilling each batch, the writer has to write all referenced buffers
481    /// repeatedly.
482    fn organize_stringview_arrays(
483        globally_sorted_batches: &mut Vec<RecordBatch>,
484    ) -> Result<()> {
485        let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
486
487        for batch in globally_sorted_batches.drain(..) {
488            let mut new_columns: Vec<Arc<dyn Array>> =
489                Vec::with_capacity(batch.num_columns());
490
491            let mut arr_mutated = false;
492            for array in batch.columns() {
493                if let Some(string_view_array) =
494                    array.as_any().downcast_ref::<StringViewArray>()
495                {
496                    let new_array = string_view_array.gc();
497                    new_columns.push(Arc::new(new_array));
498                    arr_mutated = true;
499                } else {
500                    new_columns.push(Arc::clone(array));
501                }
502            }
503
504            let organized_batch = if arr_mutated {
505                RecordBatch::try_new(batch.schema(), new_columns)?
506            } else {
507                batch
508            };
509
510            organized_batches.push(organized_batch);
511        }
512
513        *globally_sorted_batches = organized_batches;
514
515        Ok(())
516    }
517
518    /// Sorts the in-memory batches and merges them into a single sorted run, then writes
519    /// the result to spill files.
520    async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
521        if self.in_mem_batches.is_empty() {
522            return internal_err!(
523                "in_mem_batches must not be empty when attempting to sort and spill"
524            );
525        }
526
527        // Release the memory reserved for merge back to the pool so
528        // there is some left when `in_mem_sort_stream` requests an
529        // allocation. At the end of this function, memory will be
530        // reserved again for the next spill.
531        self.merge_reservation.free();
532
533        let mut sorted_stream =
534            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
535        // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
536        // to construct a globally sorted stream.
537        if !self.in_mem_batches.is_empty() {
538            return internal_err!(
539                "in_mem_batches should be empty after constructing sorted stream"
540            );
541        }
542        // 'global' here refers to all buffered batches when the memory limit is
543        // reached. This variable will buffer the sorted batches after
544        // sort-preserving merge and incrementally append to spill files.
545        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
546
547        while let Some(batch) = sorted_stream.next().await {
548            let batch = batch?;
549            let sorted_size = get_reserved_byte_for_record_batch(&batch);
550            if self.reservation.try_grow(sorted_size).is_err() {
551                // Although the reservation is not enough, the batch is
552                // already in memory, so it's okay to combine it with previously
553                // sorted batches, and spill together.
554                globally_sorted_batches.push(batch);
555                self.consume_and_spill_append(&mut globally_sorted_batches)
556                    .await?; // reservation is freed in spill()
557            } else {
558                globally_sorted_batches.push(batch);
559            }
560        }
561
562        // Drop early to free up memory reserved by the sorted stream, otherwise the
563        // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
564        drop(sorted_stream);
565
566        self.consume_and_spill_append(&mut globally_sorted_batches)
567            .await?;
568        self.spill_finish().await?;
569
570        // Sanity check after spilling
571        let buffers_cleared_property =
572            self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
573        if !buffers_cleared_property {
574            return internal_err!(
575                "in_mem_batches and globally_sorted_batches should be cleared before"
576            );
577        }
578
579        // Reserve headroom for next sort/merge
580        self.reserve_memory_for_merge()?;
581
582        Ok(())
583    }
584
585    /// Consumes in_mem_batches returning a sorted stream of
586    /// batches. This proceeds in one of two ways:
587    ///
588    /// # Small Datasets
589    ///
590    /// For "smaller" datasets, the data is first concatenated into a
591    /// single batch and then sorted. This is often faster than
592    /// sorting and then merging.
593    ///
594    /// ```text
595    ///        ┌─────┐
596    ///        │  2  │
597    ///        │  3  │
598    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
599    ///        │  4  │                     │  2  │
600    ///        │  2  │        │            │  3  │
601    ///        └─────┘                     │  1  │             sorted output
602    ///        ┌─────┐        ▼            │  4  │                stream
603    ///        │  1  │                     │  2  │
604    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
605    ///        │  1  │                     │  4  │
606    ///        └─────┘        ▲            │  1  │
607    ///          ...          │            │ ... │
608    ///                                    │  4  │
609    ///        ┌─────┐        │            │  3  │
610    ///        │  4  │                     └─────┘
611    ///        │  3  │─ ─ ─ ─ ┘
612    ///        └─────┘
613    ///     in_mem_batches
614    /// ```
615    ///
616    /// # Larger datasets
617    ///
618    /// For larger datasets, the batches are first sorted individually
619    /// and then merged together.
620    ///
621    /// ```text
622    ///      ┌─────┐                ┌─────┐
623    ///      │  2  │                │  1  │
624    ///      │  3  │                │  2  │
625    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
626    ///      │  4  │                │  3  │
627    ///      │  2  │                │  4  │          │
628    ///      └─────┘                └─────┘               sorted output
629    ///      ┌─────┐                ┌─────┐          ▼       stream
630    ///      │  1  │                │  1  │
631    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
632    ///      │  1  │                │  4  │
633    ///      └─────┘                └─────┘          ▲
634    ///        ...       ...         ...             │
635    ///
636    ///      ┌─────┐                ┌─────┐          │
637    ///      │  4  │                │  3  │
638    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
639    ///      └─────┘                └─────┘
640    ///
641    ///   in_mem_batches
642    /// ```
643    fn in_mem_sort_stream(
644        &mut self,
645        metrics: BaselineMetrics,
646    ) -> Result<SendableRecordBatchStream> {
647        if self.in_mem_batches.is_empty() {
648            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
649                &self.schema,
650            ))));
651        }
652
653        // The elapsed compute timer is updated when the value is dropped.
654        // There is no need for an explicit call to drop.
655        let elapsed_compute = metrics.elapsed_compute().clone();
656        let _timer = elapsed_compute.timer();
657
658        // Please pay attention that any operation inside of `in_mem_sort_stream` will
659        // not perform any memory reservation. This is for avoiding the need of handling
660        // reservation failure and spilling in the middle of the sort/merge. The memory
661        // space for batches produced by the resulting stream will be reserved by the
662        // consumer of the stream.
663
664        if self.in_mem_batches.len() == 1 {
665            let batch = self.in_mem_batches.swap_remove(0);
666            let reservation = self.reservation.take();
667            return self.sort_batch_stream(batch, metrics, reservation, true);
668        }
669
670        // If less than sort_in_place_threshold_bytes, concatenate and sort in place
671        if self.reservation.size() < self.sort_in_place_threshold_bytes {
672            // Concatenate memory batches together and sort
673            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
674            self.in_mem_batches.clear();
675            self.reservation
676                .try_resize(get_reserved_byte_for_record_batch(&batch))
677                .map_err(Self::err_with_oom_context)?;
678            let reservation = self.reservation.take();
679            return self.sort_batch_stream(batch, metrics, reservation, true);
680        }
681
682        let streams = std::mem::take(&mut self.in_mem_batches)
683            .into_iter()
684            .map(|batch| {
685                let metrics = self.metrics.baseline.intermediate();
686                let reservation = self
687                    .reservation
688                    .split(get_reserved_byte_for_record_batch(&batch));
689                let input = self.sort_batch_stream(
690                    batch,
691                    metrics,
692                    reservation,
693                    // Passing false as `StreamingMergeBuilder` will split the
694                    // stream into batches of `self.batch_size` rows.
695                    false,
696                )?;
697                Ok(spawn_buffered(input, 1))
698            })
699            .collect::<Result<_>>()?;
700
701        StreamingMergeBuilder::new()
702            .with_streams(streams)
703            .with_schema(Arc::clone(&self.schema))
704            .with_expressions(&self.expr.clone())
705            .with_metrics(metrics)
706            .with_batch_size(self.batch_size)
707            .with_fetch(None)
708            .with_reservation(self.merge_reservation.new_empty())
709            .build()
710    }
711
712    /// Sorts a single `RecordBatch` into a single stream.
713    ///
714    /// `reservation` accounts for the memory used by this batch and
715    /// is released when the sort is complete
716    ///
717    /// passing `split` true will return a [`BatchSplitStream`] where each batch maximum row count
718    /// will be `self.batch_size`.
719    /// If `split` is false, the stream will return a single batch
720    fn sort_batch_stream(
721        &self,
722        batch: RecordBatch,
723        metrics: BaselineMetrics,
724        reservation: MemoryReservation,
725        mut split: bool,
726    ) -> Result<SendableRecordBatchStream> {
727        assert_eq!(
728            get_reserved_byte_for_record_batch(&batch),
729            reservation.size()
730        );
731
732        split = split && batch.num_rows() > self.batch_size;
733
734        let schema = batch.schema();
735
736        let expressions = self.expr.clone();
737        let stream = futures::stream::once(async move {
738            let _timer = metrics.elapsed_compute().timer();
739
740            let sorted = sort_batch(&batch, &expressions, None)?;
741
742            metrics.record_output(sorted.num_rows());
743            drop(batch);
744            drop(reservation);
745            Ok(sorted)
746        });
747
748        let mut output: SendableRecordBatchStream =
749            Box::pin(RecordBatchStreamAdapter::new(schema, stream));
750
751        if split {
752            output = Box::pin(BatchSplitStream::new(
753                output,
754                self.batch_size,
755                self.metrics.split_metrics.clone(),
756            ));
757        }
758
759        Ok(output)
760    }
761
762    /// If this sort may spill, pre-allocates
763    /// `sort_spill_reservation_bytes` of memory to guarantee memory
764    /// left for the in memory sort/merge.
765    fn reserve_memory_for_merge(&mut self) -> Result<()> {
766        // Reserve headroom for next merge sort
767        if self.runtime.disk_manager.tmp_files_enabled() {
768            let size = self.sort_spill_reservation_bytes;
769            if self.merge_reservation.size() != size {
770                self.merge_reservation
771                    .try_resize(size)
772                    .map_err(Self::err_with_oom_context)?;
773            }
774        }
775
776        Ok(())
777    }
778
779    /// Reserves memory to be able to accommodate the given batch.
780    /// If memory is scarce, tries to spill current in-memory batches to disk first.
781    async fn reserve_memory_for_batch_and_maybe_spill(
782        &mut self,
783        input: &RecordBatch,
784    ) -> Result<()> {
785        let size = get_reserved_byte_for_record_batch(input);
786
787        match self.reservation.try_grow(size) {
788            Ok(_) => Ok(()),
789            Err(e) => {
790                if self.in_mem_batches.is_empty() {
791                    return Err(Self::err_with_oom_context(e));
792                }
793
794                // Spill and try again.
795                self.sort_and_spill_in_mem_batches().await?;
796                self.reservation
797                    .try_grow(size)
798                    .map_err(Self::err_with_oom_context)
799            }
800        }
801    }
802
803    /// Wraps the error with a context message suggesting settings to tweak.
804    /// This is meant to be used with DataFusionError::ResourcesExhausted only.
805    fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
806        match e {
807            DataFusionError::ResourcesExhausted(_) => e.context(
808                "Not enough memory to continue external sort. \
809                    Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
810            ),
811            // This is not an OOM error, so just return it as is.
812            _ => e,
813        }
814    }
815}
816
817/// Estimate how much memory is needed to sort a `RecordBatch`.
818///
819/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
820/// creating sorted copies of sorted columns in record batches for speeding up comparison
821/// in sorting and merging. The sorted copies are in either row format or array format.
822/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
823/// sorted copies are, they will use more memory than the original record batch.
824pub(crate) fn get_reserved_byte_for_record_batch_size(record_batch_size: usize) -> usize {
825    // 2x may not be enough for some cases, but it's a good start.
826    // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
827    // to compensate for the extra memory needed.
828    record_batch_size * 2
829}
830
831/// Estimate how much memory is needed to sort a `RecordBatch`.
832fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
833    get_reserved_byte_for_record_batch_size(get_record_batch_memory_size(batch))
834}
835
836impl Debug for ExternalSorter {
837    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
838        f.debug_struct("ExternalSorter")
839            .field("memory_used", &self.used())
840            .field("spilled_bytes", &self.spilled_bytes())
841            .field("spilled_rows", &self.spilled_rows())
842            .field("spill_count", &self.spill_count())
843            .finish()
844    }
845}
846
847pub fn sort_batch(
848    batch: &RecordBatch,
849    expressions: &LexOrdering,
850    fetch: Option<usize>,
851) -> Result<RecordBatch> {
852    let sort_columns = expressions
853        .iter()
854        .map(|expr| expr.evaluate_to_sort_column(batch))
855        .collect::<Result<Vec<_>>>()?;
856
857    let indices = lexsort_to_indices(&sort_columns, fetch)?;
858    let mut columns = take_arrays(batch.columns(), &indices, None)?;
859
860    // The columns may be larger than the unsorted columns in `batch` especially for variable length
861    // data types due to exponential growth when building the sort columns. We shrink the columns
862    // to prevent memory reservation failures, as well as excessive memory allocation when running
863    // merges in `SortPreservingMergeStream`.
864    columns.iter_mut().for_each(|c| {
865        c.shrink_to_fit();
866    });
867
868    let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
869    Ok(RecordBatch::try_new_with_options(
870        batch.schema(),
871        columns,
872        &options,
873    )?)
874}
875
876/// Sort execution plan.
877///
878/// Support sorting datasets that are larger than the memory allotted
879/// by the memory manager, by spilling to disk.
880#[derive(Debug, Clone)]
881pub struct SortExec {
882    /// Input schema
883    pub(crate) input: Arc<dyn ExecutionPlan>,
884    /// Sort expressions
885    expr: LexOrdering,
886    /// Containing all metrics set created during sort
887    metrics_set: ExecutionPlanMetricsSet,
888    /// Preserve partitions of input plan. If false, the input partitions
889    /// will be sorted and merged into a single output partition.
890    preserve_partitioning: bool,
891    /// Fetch highest/lowest n results
892    fetch: Option<usize>,
893    /// Normalized common sort prefix between the input and the sort expressions (only used with fetch)
894    common_sort_prefix: Vec<PhysicalSortExpr>,
895    /// Cache holding plan properties like equivalences, output partitioning etc.
896    cache: PlanProperties,
897    /// Filter matching the state of the sort for dynamic filter pushdown.
898    /// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
899    /// If `fetch` is `None`, this will be `None`.
900    filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
901}
902
903impl SortExec {
904    /// Create a new sort execution plan that produces a single,
905    /// sorted output partition.
906    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
907        let preserve_partitioning = false;
908        let (cache, sort_prefix) =
909            Self::compute_properties(&input, expr.clone(), preserve_partitioning)
910                .unwrap();
911        Self {
912            expr,
913            input,
914            metrics_set: ExecutionPlanMetricsSet::new(),
915            preserve_partitioning,
916            fetch: None,
917            common_sort_prefix: sort_prefix,
918            cache,
919            filter: None,
920        }
921    }
922
923    /// Whether this `SortExec` preserves partitioning of the children
924    pub fn preserve_partitioning(&self) -> bool {
925        self.preserve_partitioning
926    }
927
928    /// Specify the partitioning behavior of this sort exec
929    ///
930    /// If `preserve_partitioning` is true, sorts each partition
931    /// individually, producing one sorted stream for each input partition.
932    ///
933    /// If `preserve_partitioning` is false, sorts and merges all
934    /// input partitions producing a single, sorted partition.
935    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
936        self.preserve_partitioning = preserve_partitioning;
937        self.cache = self
938            .cache
939            .with_partitioning(Self::output_partitioning_helper(
940                &self.input,
941                self.preserve_partitioning,
942            ));
943        self
944    }
945
946    /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
947    fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
948        let children = self
949            .expr
950            .iter()
951            .map(|sort_expr| Arc::clone(&sort_expr.expr))
952            .collect::<Vec<_>>();
953        Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
954            DynamicFilterPhysicalExpr::new(children, lit(true)),
955        ))))
956    }
957
958    fn cloned(&self) -> Self {
959        SortExec {
960            input: Arc::clone(&self.input),
961            expr: self.expr.clone(),
962            metrics_set: self.metrics_set.clone(),
963            preserve_partitioning: self.preserve_partitioning,
964            common_sort_prefix: self.common_sort_prefix.clone(),
965            fetch: self.fetch,
966            cache: self.cache.clone(),
967            filter: self.filter.clone(),
968        }
969    }
970
971    /// Modify how many rows to include in the result
972    ///
973    /// If None, then all rows will be returned, in sorted order.
974    /// If Some, then only the top `fetch` rows will be returned.
975    /// This can reduce the memory pressure required by the sort
976    /// operation since rows that are not going to be included
977    /// can be dropped.
978    pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
979        let mut cache = self.cache.clone();
980        // If the SortExec can emit incrementally (that means the sort requirements
981        // and properties of the input match), the SortExec can generate its result
982        // without scanning the entire input when a fetch value exists.
983        let is_pipeline_friendly = matches!(
984            self.cache.emission_type,
985            EmissionType::Incremental | EmissionType::Both
986        );
987        if fetch.is_some() && is_pipeline_friendly {
988            cache = cache.with_boundedness(Boundedness::Bounded);
989        }
990        let filter = fetch.is_some().then(|| {
991            // If we already have a filter, keep it. Otherwise, create a new one.
992            self.filter.clone().unwrap_or_else(|| self.create_filter())
993        });
994        let mut new_sort = self.cloned();
995        new_sort.fetch = fetch;
996        new_sort.cache = cache;
997        new_sort.filter = filter;
998        new_sort
999    }
1000
1001    /// Input schema
1002    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1003        &self.input
1004    }
1005
1006    /// Sort expressions
1007    pub fn expr(&self) -> &LexOrdering {
1008        &self.expr
1009    }
1010
1011    /// If `Some(fetch)`, limits output to only the first "fetch" items
1012    pub fn fetch(&self) -> Option<usize> {
1013        self.fetch
1014    }
1015
1016    fn output_partitioning_helper(
1017        input: &Arc<dyn ExecutionPlan>,
1018        preserve_partitioning: bool,
1019    ) -> Partitioning {
1020        // Get output partitioning:
1021        if preserve_partitioning {
1022            input.output_partitioning().clone()
1023        } else {
1024            Partitioning::UnknownPartitioning(1)
1025        }
1026    }
1027
1028    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
1029    /// It also returns the common sort prefix between the input and the sort expressions.
1030    fn compute_properties(
1031        input: &Arc<dyn ExecutionPlan>,
1032        sort_exprs: LexOrdering,
1033        preserve_partitioning: bool,
1034    ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1035        let (sort_prefix, sort_satisfied) = input
1036            .equivalence_properties()
1037            .extract_common_sort_prefix(sort_exprs.clone())?;
1038
1039        // The emission type depends on whether the input is already sorted:
1040        // - If already fully sorted, we can emit results in the same way as the input
1041        // - If not sorted, we must wait until all data is processed to emit results (Final)
1042        let emission_type = if sort_satisfied {
1043            input.pipeline_behavior()
1044        } else {
1045            EmissionType::Final
1046        };
1047
1048        // The boundedness depends on whether the input is already sorted:
1049        // - If already sorted, we have the same property as the input
1050        // - If not sorted and input is unbounded, we require infinite memory and generates
1051        //   unbounded data (not practical).
1052        // - If not sorted and input is bounded, then the SortExec is bounded, too.
1053        let boundedness = if sort_satisfied {
1054            input.boundedness()
1055        } else {
1056            match input.boundedness() {
1057                Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1058                    requires_infinite_memory: true,
1059                },
1060                bounded => bounded,
1061            }
1062        };
1063
1064        // Calculate equivalence properties; i.e. reset the ordering equivalence
1065        // class with the new ordering:
1066        let mut eq_properties = input.equivalence_properties().clone();
1067        eq_properties.reorder(sort_exprs)?;
1068
1069        // Get output partitioning:
1070        let output_partitioning =
1071            Self::output_partitioning_helper(input, preserve_partitioning);
1072
1073        Ok((
1074            PlanProperties::new(
1075                eq_properties,
1076                output_partitioning,
1077                emission_type,
1078                boundedness,
1079            ),
1080            sort_prefix,
1081        ))
1082    }
1083}
1084
1085impl DisplayAs for SortExec {
1086    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1087        match t {
1088            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1089                let preserve_partitioning = self.preserve_partitioning;
1090                match self.fetch {
1091                    Some(fetch) => {
1092                        write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
1093                        if let Some(filter) = &self.filter {
1094                            if let Ok(current) = filter.read().expr().current() {
1095                                if !current.eq(&lit(true)) {
1096                                    write!(f, ", filter=[{current}]")?;
1097                                }
1098                            }
1099                        }
1100                        if !self.common_sort_prefix.is_empty() {
1101                            write!(f, ", sort_prefix=[")?;
1102                            let mut first = true;
1103                            for sort_expr in &self.common_sort_prefix {
1104                                if first {
1105                                    first = false;
1106                                } else {
1107                                    write!(f, ", ")?;
1108                                }
1109                                write!(f, "{sort_expr}")?;
1110                            }
1111                            write!(f, "]")
1112                        } else {
1113                            Ok(())
1114                        }
1115                    }
1116                    None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
1117                }
1118            }
1119            DisplayFormatType::TreeRender => match self.fetch {
1120                Some(fetch) => {
1121                    writeln!(f, "{}", self.expr)?;
1122                    writeln!(f, "limit={fetch}")
1123                }
1124                None => {
1125                    writeln!(f, "{}", self.expr)
1126                }
1127            },
1128        }
1129    }
1130}
1131
1132impl ExecutionPlan for SortExec {
1133    fn name(&self) -> &'static str {
1134        match self.fetch {
1135            Some(_) => "SortExec(TopK)",
1136            None => "SortExec",
1137        }
1138    }
1139
1140    fn as_any(&self) -> &dyn Any {
1141        self
1142    }
1143
1144    fn properties(&self) -> &PlanProperties {
1145        &self.cache
1146    }
1147
1148    fn required_input_distribution(&self) -> Vec<Distribution> {
1149        if self.preserve_partitioning {
1150            vec![Distribution::UnspecifiedDistribution]
1151        } else {
1152            // global sort
1153            // TODO support RangePartition and OrderedDistribution
1154            vec![Distribution::SinglePartition]
1155        }
1156    }
1157
1158    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1159        vec![&self.input]
1160    }
1161
1162    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1163        vec![false]
1164    }
1165
1166    fn with_new_children(
1167        self: Arc<Self>,
1168        children: Vec<Arc<dyn ExecutionPlan>>,
1169    ) -> Result<Arc<dyn ExecutionPlan>> {
1170        let mut new_sort = self.cloned();
1171        assert!(
1172            children.len() == 1,
1173            "SortExec should have exactly one child"
1174        );
1175        new_sort.input = Arc::clone(&children[0]);
1176        // Recompute the properties based on the new input since they may have changed
1177        let (cache, sort_prefix) = Self::compute_properties(
1178            &new_sort.input,
1179            new_sort.expr.clone(),
1180            new_sort.preserve_partitioning,
1181        )?;
1182        new_sort.cache = cache;
1183        new_sort.common_sort_prefix = sort_prefix;
1184
1185        Ok(Arc::new(new_sort))
1186    }
1187
1188    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1189        let children = self.children().into_iter().cloned().collect();
1190        let new_sort = self.with_new_children(children)?;
1191        let mut new_sort = new_sort
1192            .as_any()
1193            .downcast_ref::<SortExec>()
1194            .expect("cloned 1 lines above this line, we know the type")
1195            .clone();
1196        // Our dynamic filter and execution metrics are the state we need to reset.
1197        new_sort.filter = Some(new_sort.create_filter());
1198        new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1199
1200        Ok(Arc::new(new_sort))
1201    }
1202
1203    fn execute(
1204        &self,
1205        partition: usize,
1206        context: Arc<TaskContext>,
1207    ) -> Result<SendableRecordBatchStream> {
1208        trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
1209
1210        let mut input = self.input.execute(partition, Arc::clone(&context))?;
1211
1212        let execution_options = &context.session_config().options().execution;
1213
1214        trace!("End SortExec's input.execute for partition: {partition}");
1215
1216        let sort_satisfied = self
1217            .input
1218            .equivalence_properties()
1219            .ordering_satisfy(self.expr.clone())?;
1220
1221        match (sort_satisfied, self.fetch.as_ref()) {
1222            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1223                input,
1224                0,
1225                Some(*fetch),
1226                BaselineMetrics::new(&self.metrics_set, partition),
1227            ))),
1228            (true, None) => Ok(input),
1229            (false, Some(fetch)) => {
1230                let filter = self.filter.clone();
1231                let mut topk = TopK::try_new(
1232                    partition,
1233                    input.schema(),
1234                    self.common_sort_prefix.clone(),
1235                    self.expr.clone(),
1236                    *fetch,
1237                    context.session_config().batch_size(),
1238                    context.runtime_env(),
1239                    &self.metrics_set,
1240                    Arc::clone(&unwrap_or_internal_err!(filter)),
1241                )?;
1242                Ok(Box::pin(RecordBatchStreamAdapter::new(
1243                    self.schema(),
1244                    futures::stream::once(async move {
1245                        while let Some(batch) = input.next().await {
1246                            let batch = batch?;
1247                            topk.insert_batch(batch)?;
1248                            if topk.finished {
1249                                break;
1250                            }
1251                        }
1252                        topk.emit()
1253                    })
1254                    .try_flatten(),
1255                )))
1256            }
1257            (false, None) => {
1258                let mut sorter = ExternalSorter::new(
1259                    partition,
1260                    input.schema(),
1261                    self.expr.clone(),
1262                    context.session_config().batch_size(),
1263                    execution_options.sort_spill_reservation_bytes,
1264                    execution_options.sort_in_place_threshold_bytes,
1265                    context.session_config().spill_compression(),
1266                    &self.metrics_set,
1267                    context.runtime_env(),
1268                )?;
1269                Ok(Box::pin(RecordBatchStreamAdapter::new(
1270                    self.schema(),
1271                    futures::stream::once(async move {
1272                        while let Some(batch) = input.next().await {
1273                            let batch = batch?;
1274                            sorter.insert_batch(batch).await?;
1275                        }
1276                        sorter.sort().await
1277                    })
1278                    .try_flatten(),
1279                )))
1280            }
1281        }
1282    }
1283
1284    fn metrics(&self) -> Option<MetricsSet> {
1285        Some(self.metrics_set.clone_inner())
1286    }
1287
1288    fn statistics(&self) -> Result<Statistics> {
1289        self.partition_statistics(None)
1290    }
1291
1292    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1293        if !self.preserve_partitioning() {
1294            return self.input.partition_statistics(None)?.with_fetch(
1295                self.schema(),
1296                self.fetch,
1297                0,
1298                1,
1299            );
1300        }
1301        self.input.partition_statistics(partition)?.with_fetch(
1302            self.schema(),
1303            self.fetch,
1304            0,
1305            1,
1306        )
1307    }
1308
1309    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1310        Some(Arc::new(SortExec::with_fetch(self, limit)))
1311    }
1312
1313    fn fetch(&self) -> Option<usize> {
1314        self.fetch
1315    }
1316
1317    fn cardinality_effect(&self) -> CardinalityEffect {
1318        if self.fetch.is_none() {
1319            CardinalityEffect::Equal
1320        } else {
1321            CardinalityEffect::LowerEqual
1322        }
1323    }
1324
1325    /// Tries to swap the projection with its input [`SortExec`]. If it can be done,
1326    /// it returns the new swapped version having the [`SortExec`] as the top plan.
1327    /// Otherwise, it returns None.
1328    fn try_swapping_with_projection(
1329        &self,
1330        projection: &ProjectionExec,
1331    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1332        // If the projection does not narrow the schema, we should not try to push it down.
1333        if projection.expr().len() >= projection.input().schema().fields().len() {
1334            return Ok(None);
1335        }
1336
1337        let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1338        else {
1339            return Ok(None);
1340        };
1341
1342        Ok(Some(Arc::new(
1343            SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1344                .with_fetch(self.fetch())
1345                .with_preserve_partitioning(self.preserve_partitioning()),
1346        )))
1347    }
1348
1349    fn gather_filters_for_pushdown(
1350        &self,
1351        phase: FilterPushdownPhase,
1352        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1353        config: &datafusion_common::config::ConfigOptions,
1354    ) -> Result<FilterDescription> {
1355        if !matches!(phase, FilterPushdownPhase::Post) {
1356            return FilterDescription::from_children(parent_filters, &self.children());
1357        }
1358
1359        let mut child =
1360            ChildFilterDescription::from_child(&parent_filters, self.input())?;
1361
1362        if let Some(filter) = &self.filter {
1363            if config.optimizer.enable_dynamic_filter_pushdown {
1364                child = child.with_self_filter(filter.read().expr());
1365            }
1366        }
1367
1368        Ok(FilterDescription::new().with_child(child))
1369    }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374    use std::collections::HashMap;
1375    use std::pin::Pin;
1376    use std::task::{Context, Poll};
1377
1378    use super::*;
1379    use crate::coalesce_partitions::CoalescePartitionsExec;
1380    use crate::collect;
1381    use crate::execution_plan::Boundedness;
1382    use crate::expressions::col;
1383    use crate::test;
1384    use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1385    use crate::test::TestMemoryExec;
1386    use crate::test::{assert_is_pending, make_partition};
1387
1388    use arrow::array::*;
1389    use arrow::compute::SortOptions;
1390    use arrow::datatypes::*;
1391    use datafusion_common::cast::as_primitive_array;
1392    use datafusion_common::test_util::batches_to_string;
1393    use datafusion_common::{DataFusionError, Result, ScalarValue};
1394    use datafusion_execution::config::SessionConfig;
1395    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1396    use datafusion_execution::RecordBatchStream;
1397    use datafusion_physical_expr::expressions::{Column, Literal};
1398    use datafusion_physical_expr::EquivalenceProperties;
1399
1400    use futures::{FutureExt, Stream};
1401    use insta::assert_snapshot;
1402
1403    #[derive(Debug, Clone)]
1404    pub struct SortedUnboundedExec {
1405        schema: Schema,
1406        batch_size: u64,
1407        cache: PlanProperties,
1408    }
1409
1410    impl DisplayAs for SortedUnboundedExec {
1411        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1412            match t {
1413                DisplayFormatType::Default
1414                | DisplayFormatType::Verbose
1415                | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1416            }
1417            Ok(())
1418        }
1419    }
1420
1421    impl SortedUnboundedExec {
1422        fn compute_properties(schema: SchemaRef) -> PlanProperties {
1423            let mut eq_properties = EquivalenceProperties::new(schema);
1424            eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1425                Column::new("c1", 0),
1426            ))]);
1427            PlanProperties::new(
1428                eq_properties,
1429                Partitioning::UnknownPartitioning(1),
1430                EmissionType::Final,
1431                Boundedness::Unbounded {
1432                    requires_infinite_memory: false,
1433                },
1434            )
1435        }
1436    }
1437
1438    impl ExecutionPlan for SortedUnboundedExec {
1439        fn name(&self) -> &'static str {
1440            Self::static_name()
1441        }
1442
1443        fn as_any(&self) -> &dyn Any {
1444            self
1445        }
1446
1447        fn properties(&self) -> &PlanProperties {
1448            &self.cache
1449        }
1450
1451        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1452            vec![]
1453        }
1454
1455        fn with_new_children(
1456            self: Arc<Self>,
1457            _: Vec<Arc<dyn ExecutionPlan>>,
1458        ) -> Result<Arc<dyn ExecutionPlan>> {
1459            Ok(self)
1460        }
1461
1462        fn execute(
1463            &self,
1464            _partition: usize,
1465            _context: Arc<TaskContext>,
1466        ) -> Result<SendableRecordBatchStream> {
1467            Ok(Box::pin(SortedUnboundedStream {
1468                schema: Arc::new(self.schema.clone()),
1469                batch_size: self.batch_size,
1470                offset: 0,
1471            }))
1472        }
1473    }
1474
1475    #[derive(Debug)]
1476    pub struct SortedUnboundedStream {
1477        schema: SchemaRef,
1478        batch_size: u64,
1479        offset: u64,
1480    }
1481
1482    impl Stream for SortedUnboundedStream {
1483        type Item = Result<RecordBatch>;
1484
1485        fn poll_next(
1486            mut self: Pin<&mut Self>,
1487            _cx: &mut Context<'_>,
1488        ) -> Poll<Option<Self::Item>> {
1489            let batch = SortedUnboundedStream::create_record_batch(
1490                Arc::clone(&self.schema),
1491                self.offset,
1492                self.batch_size,
1493            );
1494            self.offset += self.batch_size;
1495            Poll::Ready(Some(Ok(batch)))
1496        }
1497    }
1498
1499    impl RecordBatchStream for SortedUnboundedStream {
1500        fn schema(&self) -> SchemaRef {
1501            Arc::clone(&self.schema)
1502        }
1503    }
1504
1505    impl SortedUnboundedStream {
1506        fn create_record_batch(
1507            schema: SchemaRef,
1508            offset: u64,
1509            batch_size: u64,
1510        ) -> RecordBatch {
1511            let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1512            let array = UInt64Array::from(values);
1513            let array_ref: ArrayRef = Arc::new(array);
1514            RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1515        }
1516    }
1517
1518    #[tokio::test]
1519    async fn test_in_mem_sort() -> Result<()> {
1520        let task_ctx = Arc::new(TaskContext::default());
1521        let partitions = 4;
1522        let csv = test::scan_partitioned(partitions);
1523        let schema = csv.schema();
1524
1525        let sort_exec = Arc::new(SortExec::new(
1526            [PhysicalSortExpr {
1527                expr: col("i", &schema)?,
1528                options: SortOptions::default(),
1529            }]
1530            .into(),
1531            Arc::new(CoalescePartitionsExec::new(csv)),
1532        ));
1533
1534        let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1535
1536        assert_eq!(result.len(), 1);
1537        assert_eq!(result[0].num_rows(), 400);
1538        assert_eq!(
1539            task_ctx.runtime_env().memory_pool.reserved(),
1540            0,
1541            "The sort should have returned all memory used back to the memory manager"
1542        );
1543
1544        Ok(())
1545    }
1546
1547    #[tokio::test]
1548    async fn test_sort_spill() -> Result<()> {
1549        // trigger spill w/ 100 batches
1550        let session_config = SessionConfig::new();
1551        let sort_spill_reservation_bytes = session_config
1552            .options()
1553            .execution
1554            .sort_spill_reservation_bytes;
1555        let runtime = RuntimeEnvBuilder::new()
1556            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1557            .build_arc()?;
1558        let task_ctx = Arc::new(
1559            TaskContext::default()
1560                .with_session_config(session_config)
1561                .with_runtime(runtime),
1562        );
1563
1564        // The input has 100 partitions, each partition has a batch containing 100 rows.
1565        // Each row has a single Int32 column with values 0..100. The total size of the
1566        // input is roughly 40000 bytes.
1567        let partitions = 100;
1568        let input = test::scan_partitioned(partitions);
1569        let schema = input.schema();
1570
1571        let sort_exec = Arc::new(SortExec::new(
1572            [PhysicalSortExpr {
1573                expr: col("i", &schema)?,
1574                options: SortOptions::default(),
1575            }]
1576            .into(),
1577            Arc::new(CoalescePartitionsExec::new(input)),
1578        ));
1579
1580        let result = collect(
1581            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1582            Arc::clone(&task_ctx),
1583        )
1584        .await?;
1585
1586        assert_eq!(result.len(), 2);
1587
1588        // Now, validate metrics
1589        let metrics = sort_exec.metrics().unwrap();
1590
1591        assert_eq!(metrics.output_rows().unwrap(), 10000);
1592        assert!(metrics.elapsed_compute().unwrap() > 0);
1593
1594        let spill_count = metrics.spill_count().unwrap();
1595        let spilled_rows = metrics.spilled_rows().unwrap();
1596        let spilled_bytes = metrics.spilled_bytes().unwrap();
1597        // Processing 40000 bytes of data using 12288 bytes of memory requires 3 spills
1598        // unless we do something really clever. It will spill roughly 9000+ rows and 36000
1599        // bytes. We leave a little wiggle room for the actual numbers.
1600        assert!((3..=10).contains(&spill_count));
1601        assert!((9000..=10000).contains(&spilled_rows));
1602        assert!((38000..=44000).contains(&spilled_bytes));
1603
1604        let columns = result[0].columns();
1605
1606        let i = as_primitive_array::<Int32Type>(&columns[0])?;
1607        assert_eq!(i.value(0), 0);
1608        assert_eq!(i.value(i.len() - 1), 81);
1609        assert_eq!(
1610            task_ctx.runtime_env().memory_pool.reserved(),
1611            0,
1612            "The sort should have returned all memory used back to the memory manager"
1613        );
1614
1615        Ok(())
1616    }
1617
1618    #[tokio::test]
1619    async fn test_batch_reservation_error() -> Result<()> {
1620        // Pick a memory limit and sort_spill_reservation that make the first batch reservation fail.
1621        // These values assume that the ExternalSorter will reserve 800 bytes for the first batch.
1622        let expected_batch_reservation = 800;
1623        let merge_reservation: usize = 0; // Set to 0 for simplicity
1624        let memory_limit: usize = expected_batch_reservation + merge_reservation - 1; // Just short of what we need
1625
1626        let session_config =
1627            SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1628        let runtime = RuntimeEnvBuilder::new()
1629            .with_memory_limit(memory_limit, 1.0)
1630            .build_arc()?;
1631        let task_ctx = Arc::new(
1632            TaskContext::default()
1633                .with_session_config(session_config)
1634                .with_runtime(runtime),
1635        );
1636
1637        let plan = test::scan_partitioned(1);
1638
1639        // Read the first record batch to assert that our memory limit and sort_spill_reservation
1640        // settings trigger the test scenario.
1641        {
1642            let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1643            let first_batch = stream.next().await.unwrap()?;
1644            let batch_reservation = get_reserved_byte_for_record_batch(&first_batch);
1645
1646            assert_eq!(batch_reservation, expected_batch_reservation);
1647            assert!(memory_limit < (merge_reservation + batch_reservation));
1648        }
1649
1650        let sort_exec = Arc::new(SortExec::new(
1651            [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1652            plan,
1653        ));
1654
1655        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1656
1657        let err = result.unwrap_err();
1658        assert!(
1659            matches!(err, DataFusionError::Context(..)),
1660            "Assertion failed: expected a Context error, but got: {err:?}"
1661        );
1662
1663        // Assert that the context error is wrapping a resources exhausted error.
1664        assert!(
1665            matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1666            "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1667        );
1668
1669        Ok(())
1670    }
1671
1672    #[tokio::test]
1673    async fn test_sort_spill_utf8_strings() -> Result<()> {
1674        let session_config = SessionConfig::new()
1675            .with_batch_size(100)
1676            .with_sort_in_place_threshold_bytes(20 * 1024)
1677            .with_sort_spill_reservation_bytes(100 * 1024);
1678        let runtime = RuntimeEnvBuilder::new()
1679            .with_memory_limit(500 * 1024, 1.0)
1680            .build_arc()?;
1681        let task_ctx = Arc::new(
1682            TaskContext::default()
1683                .with_session_config(session_config)
1684                .with_runtime(runtime),
1685        );
1686
1687        // The input has 200 partitions, each partition has a batch containing 100 rows.
1688        // Each row has a single Utf8 column, the Utf8 string values are roughly 42 bytes.
1689        // The total size of the input is roughly 8.4 KB.
1690        let input = test::scan_partitioned_utf8(200);
1691        let schema = input.schema();
1692
1693        let sort_exec = Arc::new(SortExec::new(
1694            [PhysicalSortExpr {
1695                expr: col("i", &schema)?,
1696                options: SortOptions::default(),
1697            }]
1698            .into(),
1699            Arc::new(CoalescePartitionsExec::new(input)),
1700        ));
1701
1702        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1703
1704        let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1705        assert_eq!(num_rows, 20000);
1706
1707        // Now, validate metrics
1708        let metrics = sort_exec.metrics().unwrap();
1709
1710        assert_eq!(metrics.output_rows().unwrap(), 20000);
1711        assert!(metrics.elapsed_compute().unwrap() > 0);
1712
1713        let spill_count = metrics.spill_count().unwrap();
1714        let spilled_rows = metrics.spilled_rows().unwrap();
1715        let spilled_bytes = metrics.spilled_bytes().unwrap();
1716
1717        // This test case is processing 840KB of data using 400KB of memory. Note
1718        // that buffered batches can't be dropped until all sorted batches are
1719        // generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
1720        // batches.
1721        // The number of spills is roughly calculated as:
1722        //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1723
1724        // If this assertion fail with large spill count, make sure the following
1725        // case does not happen:
1726        // During external sorting, one sorted run should be spilled to disk in a
1727        // single file, due to memory limit we might need to append to the file
1728        // multiple times to spill all the data. Make sure we're not writing each
1729        // appending as a separate file.
1730        assert!((4..=8).contains(&spill_count));
1731        assert!((15000..=20000).contains(&spilled_rows));
1732        assert!((900000..=1000000).contains(&spilled_bytes));
1733
1734        // Verify that the result is sorted
1735        let concated_result = concat_batches(&schema, &result)?;
1736        let columns = concated_result.columns();
1737        let string_array = as_string_array(&columns[0]);
1738        for i in 0..string_array.len() - 1 {
1739            assert!(string_array.value(i) <= string_array.value(i + 1));
1740        }
1741
1742        assert_eq!(
1743            task_ctx.runtime_env().memory_pool.reserved(),
1744            0,
1745            "The sort should have returned all memory used back to the memory manager"
1746        );
1747
1748        Ok(())
1749    }
1750
1751    #[tokio::test]
1752    async fn test_sort_fetch_memory_calculation() -> Result<()> {
1753        // This test mirrors down the size from the example above.
1754        let avg_batch_size = 400;
1755        let partitions = 4;
1756
1757        // A tuple of (fetch, expect_spillage)
1758        let test_options = vec![
1759            // Since we don't have a limit (and the memory is less than the total size of
1760            // all the batches we are processing, we expect it to spill.
1761            (None, true),
1762            // When we have a limit however, the buffered size of batches should fit in memory
1763            // since it is much lower than the total size of the input batch.
1764            (Some(1), false),
1765        ];
1766
1767        for (fetch, expect_spillage) in test_options {
1768            let session_config = SessionConfig::new();
1769            let sort_spill_reservation_bytes = session_config
1770                .options()
1771                .execution
1772                .sort_spill_reservation_bytes;
1773
1774            let runtime = RuntimeEnvBuilder::new()
1775                .with_memory_limit(
1776                    sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1777                    1.0,
1778                )
1779                .build_arc()?;
1780            let task_ctx = Arc::new(
1781                TaskContext::default()
1782                    .with_runtime(runtime)
1783                    .with_session_config(session_config),
1784            );
1785
1786            let csv = test::scan_partitioned(partitions);
1787            let schema = csv.schema();
1788
1789            let sort_exec = Arc::new(
1790                SortExec::new(
1791                    [PhysicalSortExpr {
1792                        expr: col("i", &schema)?,
1793                        options: SortOptions::default(),
1794                    }]
1795                    .into(),
1796                    Arc::new(CoalescePartitionsExec::new(csv)),
1797                )
1798                .with_fetch(fetch),
1799            );
1800
1801            let result =
1802                collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1803            assert_eq!(result.len(), 1);
1804
1805            let metrics = sort_exec.metrics().unwrap();
1806            let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1807            assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1808        }
1809        Ok(())
1810    }
1811
1812    #[tokio::test]
1813    async fn test_sort_metadata() -> Result<()> {
1814        let task_ctx = Arc::new(TaskContext::default());
1815        let field_metadata: HashMap<String, String> =
1816            vec![("foo".to_string(), "bar".to_string())]
1817                .into_iter()
1818                .collect();
1819        let schema_metadata: HashMap<String, String> =
1820            vec![("baz".to_string(), "barf".to_string())]
1821                .into_iter()
1822                .collect();
1823
1824        let mut field = Field::new("field_name", DataType::UInt64, true);
1825        field.set_metadata(field_metadata.clone());
1826        let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1827        let schema = Arc::new(schema);
1828
1829        let data: ArrayRef =
1830            Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1831
1832        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1833        let input =
1834            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1835
1836        let sort_exec = Arc::new(SortExec::new(
1837            [PhysicalSortExpr {
1838                expr: col("field_name", &schema)?,
1839                options: SortOptions::default(),
1840            }]
1841            .into(),
1842            input,
1843        ));
1844
1845        let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1846
1847        let expected_data: ArrayRef =
1848            Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1849        let expected_batch =
1850            RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
1851
1852        // Data is correct
1853        assert_eq!(&vec![expected_batch], &result);
1854
1855        // explicitly ensure the metadata is present
1856        assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1857        assert_eq!(result[0].schema().metadata(), &schema_metadata);
1858
1859        Ok(())
1860    }
1861
1862    #[tokio::test]
1863    async fn test_lex_sort_by_mixed_types() -> Result<()> {
1864        let task_ctx = Arc::new(TaskContext::default());
1865        let schema = Arc::new(Schema::new(vec![
1866            Field::new("a", DataType::Int32, true),
1867            Field::new(
1868                "b",
1869                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1870                true,
1871            ),
1872        ]));
1873
1874        // define data.
1875        let batch = RecordBatch::try_new(
1876            Arc::clone(&schema),
1877            vec![
1878                Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
1879                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1880                    Some(vec![Some(3)]),
1881                    Some(vec![Some(1)]),
1882                    Some(vec![Some(6), None]),
1883                    Some(vec![Some(5)]),
1884                ])),
1885            ],
1886        )?;
1887
1888        let sort_exec = Arc::new(SortExec::new(
1889            [
1890                PhysicalSortExpr {
1891                    expr: col("a", &schema)?,
1892                    options: SortOptions {
1893                        descending: false,
1894                        nulls_first: true,
1895                    },
1896                },
1897                PhysicalSortExpr {
1898                    expr: col("b", &schema)?,
1899                    options: SortOptions {
1900                        descending: true,
1901                        nulls_first: false,
1902                    },
1903                },
1904            ]
1905            .into(),
1906            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
1907        ));
1908
1909        assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
1910        assert_eq!(
1911            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1912            *sort_exec.schema().field(1).data_type()
1913        );
1914
1915        let result: Vec<RecordBatch> =
1916            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1917        let metrics = sort_exec.metrics().unwrap();
1918        assert!(metrics.elapsed_compute().unwrap() > 0);
1919        assert_eq!(metrics.output_rows().unwrap(), 4);
1920        assert_eq!(result.len(), 1);
1921
1922        let expected = RecordBatch::try_new(
1923            schema,
1924            vec![
1925                Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
1926                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1927                    Some(vec![Some(1)]),
1928                    Some(vec![Some(6), None]),
1929                    Some(vec![Some(5)]),
1930                    Some(vec![Some(3)]),
1931                ])),
1932            ],
1933        )?;
1934
1935        assert_eq!(expected, result[0]);
1936
1937        Ok(())
1938    }
1939
1940    #[tokio::test]
1941    async fn test_lex_sort_by_float() -> Result<()> {
1942        let task_ctx = Arc::new(TaskContext::default());
1943        let schema = Arc::new(Schema::new(vec![
1944            Field::new("a", DataType::Float32, true),
1945            Field::new("b", DataType::Float64, true),
1946        ]));
1947
1948        // define data.
1949        let batch = RecordBatch::try_new(
1950            Arc::clone(&schema),
1951            vec![
1952                Arc::new(Float32Array::from(vec![
1953                    Some(f32::NAN),
1954                    None,
1955                    None,
1956                    Some(f32::NAN),
1957                    Some(1.0_f32),
1958                    Some(1.0_f32),
1959                    Some(2.0_f32),
1960                    Some(3.0_f32),
1961                ])),
1962                Arc::new(Float64Array::from(vec![
1963                    Some(200.0_f64),
1964                    Some(20.0_f64),
1965                    Some(10.0_f64),
1966                    Some(100.0_f64),
1967                    Some(f64::NAN),
1968                    None,
1969                    None,
1970                    Some(f64::NAN),
1971                ])),
1972            ],
1973        )?;
1974
1975        let sort_exec = Arc::new(SortExec::new(
1976            [
1977                PhysicalSortExpr {
1978                    expr: col("a", &schema)?,
1979                    options: SortOptions {
1980                        descending: true,
1981                        nulls_first: true,
1982                    },
1983                },
1984                PhysicalSortExpr {
1985                    expr: col("b", &schema)?,
1986                    options: SortOptions {
1987                        descending: false,
1988                        nulls_first: false,
1989                    },
1990                },
1991            ]
1992            .into(),
1993            TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
1994        ));
1995
1996        assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
1997        assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
1998
1999        let result: Vec<RecordBatch> =
2000            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2001        let metrics = sort_exec.metrics().unwrap();
2002        assert!(metrics.elapsed_compute().unwrap() > 0);
2003        assert_eq!(metrics.output_rows().unwrap(), 8);
2004        assert_eq!(result.len(), 1);
2005
2006        let columns = result[0].columns();
2007
2008        assert_eq!(DataType::Float32, *columns[0].data_type());
2009        assert_eq!(DataType::Float64, *columns[1].data_type());
2010
2011        let a = as_primitive_array::<Float32Type>(&columns[0])?;
2012        let b = as_primitive_array::<Float64Type>(&columns[1])?;
2013
2014        // convert result to strings to allow comparing to expected result containing NaN
2015        let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2016            .map(|i| {
2017                let aval = if a.is_valid(i) {
2018                    Some(a.value(i).to_string())
2019                } else {
2020                    None
2021                };
2022                let bval = if b.is_valid(i) {
2023                    Some(b.value(i).to_string())
2024                } else {
2025                    None
2026                };
2027                (aval, bval)
2028            })
2029            .collect();
2030
2031        let expected: Vec<(Option<String>, Option<String>)> = vec![
2032            (None, Some("10".to_owned())),
2033            (None, Some("20".to_owned())),
2034            (Some("NaN".to_owned()), Some("100".to_owned())),
2035            (Some("NaN".to_owned()), Some("200".to_owned())),
2036            (Some("3".to_owned()), Some("NaN".to_owned())),
2037            (Some("2".to_owned()), None),
2038            (Some("1".to_owned()), Some("NaN".to_owned())),
2039            (Some("1".to_owned()), None),
2040        ];
2041
2042        assert_eq!(expected, result);
2043
2044        Ok(())
2045    }
2046
2047    #[tokio::test]
2048    async fn test_drop_cancel() -> Result<()> {
2049        let task_ctx = Arc::new(TaskContext::default());
2050        let schema =
2051            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2052
2053        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2054        let refs = blocking_exec.refs();
2055        let sort_exec = Arc::new(SortExec::new(
2056            [PhysicalSortExpr {
2057                expr: col("a", &schema)?,
2058                options: SortOptions::default(),
2059            }]
2060            .into(),
2061            blocking_exec,
2062        ));
2063
2064        let fut = collect(sort_exec, Arc::clone(&task_ctx));
2065        let mut fut = fut.boxed();
2066
2067        assert_is_pending(&mut fut);
2068        drop(fut);
2069        assert_strong_count_converges_to_zero(refs).await;
2070
2071        assert_eq!(
2072            task_ctx.runtime_env().memory_pool.reserved(),
2073            0,
2074            "The sort should have returned all memory used back to the memory manager"
2075        );
2076
2077        Ok(())
2078    }
2079
2080    #[test]
2081    fn test_empty_sort_batch() {
2082        let schema = Arc::new(Schema::empty());
2083        let options = RecordBatchOptions::new().with_row_count(Some(1));
2084        let batch =
2085            RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2086                .unwrap();
2087
2088        let expressions = [PhysicalSortExpr {
2089            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2090            options: SortOptions::default(),
2091        }]
2092        .into();
2093
2094        let result = sort_batch(&batch, &expressions, None).unwrap();
2095        assert_eq!(result.num_rows(), 1);
2096    }
2097
2098    #[tokio::test]
2099    async fn topk_unbounded_source() -> Result<()> {
2100        let task_ctx = Arc::new(TaskContext::default());
2101        let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2102        let source = SortedUnboundedExec {
2103            schema: schema.clone(),
2104            batch_size: 2,
2105            cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
2106        };
2107        let mut plan = SortExec::new(
2108            [PhysicalSortExpr::new_default(Arc::new(Column::new(
2109                "c1", 0,
2110            )))]
2111            .into(),
2112            Arc::new(source),
2113        );
2114        plan = plan.with_fetch(Some(9));
2115
2116        let batches = collect(Arc::new(plan), task_ctx).await?;
2117        assert_snapshot!(batches_to_string(&batches), @r#"
2118            +----+
2119            | c1 |
2120            +----+
2121            | 0  |
2122            | 1  |
2123            | 2  |
2124            | 3  |
2125            | 4  |
2126            | 5  |
2127            | 6  |
2128            | 7  |
2129            | 8  |
2130            +----+
2131            "#);
2132        Ok(())
2133    }
2134
2135    #[tokio::test]
2136    async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2137        let batch_size = 100;
2138
2139        let create_task_ctx = |_: &[RecordBatch]| {
2140            TaskContext::default().with_session_config(
2141                SessionConfig::new()
2142                    .with_batch_size(batch_size)
2143                    .with_sort_in_place_threshold_bytes(usize::MAX),
2144            )
2145        };
2146
2147        // Smaller than batch size and require more than a single batch to get the requested batch size
2148        test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2149
2150        // Not evenly divisible by batch size
2151        test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2152
2153        // Evenly divisible by batch size and is larger than 2 output batches
2154        test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2155
2156        Ok(())
2157    }
2158
2159    #[tokio::test]
2160    async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place(
2161    ) -> Result<()> {
2162        let batch_size = 100;
2163
2164        let create_task_ctx = |_: &[RecordBatch]| {
2165            TaskContext::default().with_session_config(
2166                SessionConfig::new()
2167                    .with_batch_size(batch_size)
2168                    .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2169            )
2170        };
2171
2172        // Smaller than batch size and require more than a single batch to get the requested batch size
2173        {
2174            let metrics =
2175                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2176
2177            assert_eq!(
2178                metrics.spill_count(),
2179                Some(0),
2180                "Expected no spills when sorting in place"
2181            );
2182        }
2183
2184        // Not evenly divisible by batch size
2185        {
2186            let metrics =
2187                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2188
2189            assert_eq!(
2190                metrics.spill_count(),
2191                Some(0),
2192                "Expected no spills when sorting in place"
2193            );
2194        }
2195
2196        // Evenly divisible by batch size and is larger than 2 output batches
2197        {
2198            let metrics =
2199                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2200
2201            assert_eq!(
2202                metrics.spill_count(),
2203                Some(0),
2204                "Expected no spills when sorting in place"
2205            );
2206        }
2207
2208        Ok(())
2209    }
2210
2211    #[tokio::test]
2212    async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch(
2213    ) -> Result<()> {
2214        let batch_size = 100;
2215
2216        let create_task_ctx = |_: &[RecordBatch]| {
2217            TaskContext::default()
2218                .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2219        };
2220
2221        // Smaller than batch size and require more than a single batch to get the requested batch size
2222        {
2223            let metrics = test_sort_output_batch_size(
2224                // Single batch
2225                1,
2226                batch_size / 4,
2227                create_task_ctx,
2228            )
2229            .await?;
2230
2231            assert_eq!(
2232                metrics.spill_count(),
2233                Some(0),
2234                "Expected no spills when sorting in place"
2235            );
2236        }
2237
2238        // Not evenly divisible by batch size
2239        {
2240            let metrics = test_sort_output_batch_size(
2241                // Single batch
2242                1,
2243                batch_size + 7,
2244                create_task_ctx,
2245            )
2246            .await?;
2247
2248            assert_eq!(
2249                metrics.spill_count(),
2250                Some(0),
2251                "Expected no spills when sorting in place"
2252            );
2253        }
2254
2255        // Evenly divisible by batch size and is larger than 2 output batches
2256        {
2257            let metrics = test_sort_output_batch_size(
2258                // Single batch
2259                1,
2260                batch_size * 3,
2261                create_task_ctx,
2262            )
2263            .await?;
2264
2265            assert_eq!(
2266                metrics.spill_count(),
2267                Some(0),
2268                "Expected no spills when sorting in place"
2269            );
2270        }
2271
2272        Ok(())
2273    }
2274
2275    #[tokio::test]
2276    async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill(
2277    ) -> Result<()> {
2278        let batch_size = 100;
2279
2280        let create_task_ctx = |generated_batches: &[RecordBatch]| {
2281            let batches_memory = generated_batches
2282                .iter()
2283                .map(|b| b.get_array_memory_size())
2284                .sum::<usize>();
2285
2286            TaskContext::default()
2287                .with_session_config(
2288                    SessionConfig::new()
2289                        .with_batch_size(batch_size)
2290                        // To make sure there is no in place sorting
2291                        .with_sort_in_place_threshold_bytes(1)
2292                        .with_sort_spill_reservation_bytes(1),
2293                )
2294                .with_runtime(
2295                    RuntimeEnvBuilder::default()
2296                        .with_memory_limit(batches_memory, 1.0)
2297                        .build_arc()
2298                        .unwrap(),
2299                )
2300        };
2301
2302        // Smaller than batch size and require more than a single batch to get the requested batch size
2303        {
2304            let metrics =
2305                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2306
2307            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2308        }
2309
2310        // Not evenly divisible by batch size
2311        {
2312            let metrics =
2313                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2314
2315            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2316        }
2317
2318        // Evenly divisible by batch size and is larger than 2 batches
2319        {
2320            let metrics =
2321                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2322
2323            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2324        }
2325
2326        Ok(())
2327    }
2328
2329    async fn test_sort_output_batch_size(
2330        number_of_batches: usize,
2331        batch_size_to_generate: usize,
2332        create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2333    ) -> Result<MetricsSet> {
2334        let batches = (0..number_of_batches)
2335            .map(|_| make_partition(batch_size_to_generate as i32))
2336            .collect::<Vec<_>>();
2337        let task_ctx = create_task_ctx(batches.as_slice());
2338
2339        let expected_batch_size = task_ctx.session_config().batch_size();
2340
2341        let (mut output_batches, metrics) =
2342            run_sort_on_input(task_ctx, "i", batches).await?;
2343
2344        let last_batch = output_batches.pop().unwrap();
2345
2346        for batch in output_batches {
2347            assert_eq!(batch.num_rows(), expected_batch_size);
2348        }
2349
2350        let mut last_expected_batch_size =
2351            (batch_size_to_generate * number_of_batches) % expected_batch_size;
2352        if last_expected_batch_size == 0 {
2353            last_expected_batch_size = expected_batch_size;
2354        }
2355        assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2356
2357        Ok(metrics)
2358    }
2359
2360    async fn run_sort_on_input(
2361        task_ctx: TaskContext,
2362        order_by_col: &str,
2363        batches: Vec<RecordBatch>,
2364    ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2365        let task_ctx = Arc::new(task_ctx);
2366
2367        // let task_ctx = env.
2368        let schema = batches[0].schema();
2369        let ordering: LexOrdering = [PhysicalSortExpr {
2370            expr: col(order_by_col, &schema)?,
2371            options: SortOptions {
2372                descending: false,
2373                nulls_first: true,
2374            },
2375        }]
2376        .into();
2377        let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2378            ordering.clone(),
2379            TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2380        ));
2381
2382        let sorted_batches =
2383            collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2384
2385        let metrics = sort_exec.metrics().expect("sort have metrics");
2386
2387        // assert output
2388        {
2389            let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2390            let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2391
2392            let sorted_batches_concat =
2393                concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2394
2395            assert_eq!(sorted_input_batch, sorted_batches_concat);
2396        }
2397
2398        Ok((sorted_batches, metrics))
2399    }
2400}