datafusion_physical_plan/sorts/
sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort that deals with an arbitrary size of the input.
19//! It will do in-memory sorting if it has enough memory budget
20//! but spills to disk if needed.
21
22use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
31use crate::expressions::PhysicalSortExpr;
32use crate::filter_pushdown::{
33    ChildFilterDescription, FilterDescription, FilterPushdownPhase,
34};
35use crate::limit::LimitStream;
36use crate::metrics::{
37    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
38};
39use crate::projection::{ProjectionExec, make_with_child, update_ordering};
40use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
41use crate::spill::get_record_batch_memory_size;
42use crate::spill::in_progress_spill_file::InProgressSpillFile;
43use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
44use crate::stream::RecordBatchStreamAdapter;
45use crate::stream::ReservationStream;
46use crate::topk::TopK;
47use crate::topk::TopKDynamicFilters;
48use crate::{
49    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
50    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
51    Statistics,
52};
53
54use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
55use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
56use arrow::datatypes::SchemaRef;
57use datafusion_common::config::SpillCompression;
58use datafusion_common::{
59    DataFusionError, Result, assert_or_internal_err, internal_datafusion_err,
60    unwrap_or_internal_err,
61};
62use datafusion_execution::TaskContext;
63use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
64use datafusion_execution::runtime_env::RuntimeEnv;
65use datafusion_physical_expr::LexOrdering;
66use datafusion_physical_expr::PhysicalExpr;
67use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
68
69use futures::{StreamExt, TryStreamExt};
70use log::{debug, trace};
71
72struct ExternalSorterMetrics {
73    /// metrics
74    baseline: BaselineMetrics,
75
76    spill_metrics: SpillMetrics,
77}
78
79impl ExternalSorterMetrics {
80    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
81        Self {
82            baseline: BaselineMetrics::new(metrics, partition),
83            spill_metrics: SpillMetrics::new(metrics, partition),
84        }
85    }
86}
87
88/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
89/// a total order. Depending on the input size and memory manager
90/// configuration, writes intermediate results to disk ("spills")
91/// using Arrow IPC format.
92///
93/// # Algorithm
94///
95/// 1. get a non-empty new batch from input
96///
97/// 2. check with the memory manager there is sufficient space to
98///    buffer the batch in memory.
99///
100/// 2.1 if memory is sufficient, buffer batch in memory, go to 1.
101///
102/// 2.2 if no more memory is available, sort all buffered batches and
103///     spill to file.  buffer the next batch in memory, go to 1.
104///
105/// 3. when input is exhausted, merge all in memory batches and spills
106///    to get a total order.
107///
108/// # When data fits in available memory
109///
110/// If there is sufficient memory, data is sorted in memory to produce the output
111///
112/// ```text
113///    ┌─────┐
114///    │  2  │
115///    │  3  │
116///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
117///    │  4  │
118///    │  2  │                  │
119///    └─────┘                  ▼
120///    ┌─────┐
121///    │  1  │              In memory
122///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
123///    │  1  │
124///    └─────┘                  ▲
125///      ...                    │
126///
127///    ┌─────┐                  │
128///    │  4  │
129///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
130///    └─────┘
131///
132/// in_mem_batches
133/// ```
134///
135/// # When data does not fit in available memory
136///
137///  When memory is exhausted, data is first sorted and written to one
138///  or more spill files on disk:
139///
140/// ```text
141///    ┌─────┐                               .─────────────────.
142///    │  2  │                              (                   )
143///    │  3  │                              │`─────────────────'│
144///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
145///    │  4  │             │                │  │ 1  │░          │
146///    │  2  │                              │  │... │░          │
147///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
148///    ┌─────┐                              │  └────┘░    1  │░ │
149///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
150///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
151///    │  1  │     and write to file        │           │    ░░ │
152///    └─────┘                              │             4  │░ │
153///      ...               ▲                │           └░─░─░░ │
154///                        │                │            ░░░░░░ │
155///    ┌─────┐                              │.─────────────────.│
156///    │  4  │             │                (                   )
157///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
158///    └─────┘
159///
160/// in_mem_batches                                  spills
161///                                         (file on disk in Arrow
162///                                               IPC format)
163/// ```
164///
165/// Once the input is completely read, the spill files are read and
166/// merged with any in memory batches to produce a single total sorted
167/// output:
168///
169/// ```text
170///   .─────────────────.
171///  (                   )
172///  │`─────────────────'│
173///  │  ┌────┐           │
174///  │  │ 1  │░          │
175///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
176///  │  │ 4  │░ ┌────┐   │           │
177///  │  └────┘░ │ 1  │░  │           ▼
178///  │   ░░░░░░ │    │░  │
179///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
180///  │          │    │░  │
181///  │          │ 4  │░  │           ▲
182///  │          └────┘░  │           │
183///  │           ░░░░░░  │
184///  │.─────────────────.│           │
185///  (                   )
186///   `─────────────────'            │
187///         spills
188///                                  │
189///
190///                                  │
191///
192///     ┌─────┐                      │
193///     │  1  │
194///     │  4  │─ ─ ─ ─               │
195///     └─────┘       │
196///       ...                   In memory
197///                   └ ─ ─ ─▶  sort/merge
198///     ┌─────┐
199///     │  4  │                      ▲
200///     │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
201///     └─────┘
202///
203///  in_mem_batches
204/// ```
205struct ExternalSorter {
206    // ========================================================================
207    // PROPERTIES:
208    // Fields that define the sorter's configuration and remain constant
209    // ========================================================================
210    /// Schema of the output (and the input)
211    schema: SchemaRef,
212    /// Sort expressions
213    expr: LexOrdering,
214    /// The target number of rows for output batches
215    batch_size: usize,
216    /// If the in size of buffered memory batches is below this size,
217    /// the data will be concatenated and sorted in place rather than
218    /// sort/merged.
219    sort_in_place_threshold_bytes: usize,
220
221    // ========================================================================
222    // STATE BUFFERS:
223    // Fields that hold intermediate data during sorting
224    // ========================================================================
225    /// Unsorted input batches stored in the memory buffer
226    in_mem_batches: Vec<RecordBatch>,
227
228    /// During external sorting, in-memory intermediate data will be appended to
229    /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
230    ///
231    /// this is a tuple of:
232    /// 1. `InProgressSpillFile` - the file that is being written to
233    /// 2. `max_record_batch_memory` - the maximum memory usage of a single batch in this spill file.
234    in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
235    /// If data has previously been spilled, the locations of the spill files (in
236    /// Arrow IPC format)
237    /// Within the same spill file, the data might be chunked into multiple batches,
238    /// and ordered by sort keys.
239    finished_spill_files: Vec<SortedSpillFile>,
240
241    // ========================================================================
242    // EXECUTION RESOURCES:
243    // Fields related to managing execution resources and monitoring performance.
244    // ========================================================================
245    /// Runtime metrics
246    metrics: ExternalSorterMetrics,
247    /// A handle to the runtime to get spill files
248    runtime: Arc<RuntimeEnv>,
249    /// Reservation for in_mem_batches
250    reservation: MemoryReservation,
251    spill_manager: SpillManager,
252
253    /// Reservation for the merging of in-memory batches. If the sort
254    /// might spill, `sort_spill_reservation_bytes` will be
255    /// pre-reserved to ensure there is some space for this sort/merge.
256    merge_reservation: MemoryReservation,
257    /// How much memory to reserve for performing in-memory sort/merges
258    /// prior to spilling.
259    sort_spill_reservation_bytes: usize,
260}
261
262impl ExternalSorter {
263    // TODO: make a builder or some other nicer API to avoid the
264    // clippy warning
265    #[expect(clippy::too_many_arguments)]
266    pub fn new(
267        partition_id: usize,
268        schema: SchemaRef,
269        expr: LexOrdering,
270        batch_size: usize,
271        sort_spill_reservation_bytes: usize,
272        sort_in_place_threshold_bytes: usize,
273        // Configured via `datafusion.execution.spill_compression`.
274        spill_compression: SpillCompression,
275        metrics: &ExecutionPlanMetricsSet,
276        runtime: Arc<RuntimeEnv>,
277    ) -> Result<Self> {
278        let metrics = ExternalSorterMetrics::new(metrics, partition_id);
279        let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
280            .with_can_spill(true)
281            .register(&runtime.memory_pool);
282
283        let merge_reservation =
284            MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
285                .register(&runtime.memory_pool);
286
287        let spill_manager = SpillManager::new(
288            Arc::clone(&runtime),
289            metrics.spill_metrics.clone(),
290            Arc::clone(&schema),
291        )
292        .with_compression_type(spill_compression);
293
294        Ok(Self {
295            schema,
296            in_mem_batches: vec![],
297            in_progress_spill_file: None,
298            finished_spill_files: vec![],
299            expr,
300            metrics,
301            reservation,
302            spill_manager,
303            merge_reservation,
304            runtime,
305            batch_size,
306            sort_spill_reservation_bytes,
307            sort_in_place_threshold_bytes,
308        })
309    }
310
311    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
312    ///
313    /// Updates memory usage metrics, and possibly triggers spilling to disk
314    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
315        if input.num_rows() == 0 {
316            return Ok(());
317        }
318
319        self.reserve_memory_for_merge()?;
320        self.reserve_memory_for_batch_and_maybe_spill(&input)
321            .await?;
322
323        self.in_mem_batches.push(input);
324        Ok(())
325    }
326
327    fn spilled_before(&self) -> bool {
328        !self.finished_spill_files.is_empty()
329    }
330
331    /// Returns the final sorted output of all batches inserted via
332    /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
333    ///
334    /// This process could either be:
335    ///
336    /// 1. An in-memory sort/merge (if the input fit in memory)
337    ///
338    /// 2. A combined streaming merge incorporating both in-memory
339    ///    batches and data from spill files on disk.
340    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
341        // Release the memory reserved for merge back to the pool so
342        // there is some left when `in_mem_sort_stream` requests an
343        // allocation.
344        self.merge_reservation.free();
345
346        if self.spilled_before() {
347            // Sort `in_mem_batches` and spill it first. If there are many
348            // `in_mem_batches` and the memory limit is almost reached, merging
349            // them with the spilled files at the same time might cause OOM.
350            if !self.in_mem_batches.is_empty() {
351                self.sort_and_spill_in_mem_batches().await?;
352            }
353
354            StreamingMergeBuilder::new()
355                .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
356                .with_spill_manager(self.spill_manager.clone())
357                .with_schema(Arc::clone(&self.schema))
358                .with_expressions(&self.expr.clone())
359                .with_metrics(self.metrics.baseline.clone())
360                .with_batch_size(self.batch_size)
361                .with_fetch(None)
362                .with_reservation(self.merge_reservation.new_empty())
363                .build()
364        } else {
365            self.in_mem_sort_stream(self.metrics.baseline.clone())
366        }
367    }
368
369    /// How much memory is buffered in this `ExternalSorter`?
370    fn used(&self) -> usize {
371        self.reservation.size()
372    }
373
374    /// How many bytes have been spilled to disk?
375    fn spilled_bytes(&self) -> usize {
376        self.metrics.spill_metrics.spilled_bytes.value()
377    }
378
379    /// How many rows have been spilled to disk?
380    fn spilled_rows(&self) -> usize {
381        self.metrics.spill_metrics.spilled_rows.value()
382    }
383
384    /// How many spill files have been created?
385    fn spill_count(&self) -> usize {
386        self.metrics.spill_metrics.spill_file_count.value()
387    }
388
389    /// Appending globally sorted batches to the in-progress spill file, and clears
390    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
391    async fn consume_and_spill_append(
392        &mut self,
393        globally_sorted_batches: &mut Vec<RecordBatch>,
394    ) -> Result<()> {
395        if globally_sorted_batches.is_empty() {
396            return Ok(());
397        }
398
399        // Lazily initialize the in-progress spill file
400        if self.in_progress_spill_file.is_none() {
401            self.in_progress_spill_file =
402                Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
403        }
404
405        Self::organize_stringview_arrays(globally_sorted_batches)?;
406
407        debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
408
409        let batches_to_spill = std::mem::take(globally_sorted_batches);
410        self.reservation.free();
411
412        let (in_progress_file, max_record_batch_size) =
413            self.in_progress_spill_file.as_mut().ok_or_else(|| {
414                internal_datafusion_err!("In-progress spill file should be initialized")
415            })?;
416
417        for batch in batches_to_spill {
418            in_progress_file.append_batch(&batch)?;
419
420            *max_record_batch_size =
421                (*max_record_batch_size).max(batch.get_sliced_size()?);
422        }
423
424        assert_or_internal_err!(
425            globally_sorted_batches.is_empty(),
426            "This function consumes globally_sorted_batches, so it should be empty after taking."
427        );
428
429        Ok(())
430    }
431
432    /// Finishes the in-progress spill file and moves it to the finished spill files.
433    async fn spill_finish(&mut self) -> Result<()> {
434        let (mut in_progress_file, max_record_batch_memory) =
435            self.in_progress_spill_file.take().ok_or_else(|| {
436                internal_datafusion_err!("Should be called after `spill_append`")
437            })?;
438        let spill_file = in_progress_file.finish()?;
439
440        if let Some(spill_file) = spill_file {
441            self.finished_spill_files.push(SortedSpillFile {
442                file: spill_file,
443                max_record_batch_memory,
444            });
445        }
446
447        Ok(())
448    }
449
450    /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
451    /// `StringViewArray` in sequential order by calling `gc()` on them.
452    ///
453    /// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
454    /// available
455    ///
456    /// # Rationale
457    /// After (merge-based) sorting, all batches will be sorted into a single run,
458    /// but physically this sorted run is chunked into many small batches. For
459    /// `StringViewArray`s inside each sorted run, their inner buffers are not
460    /// re-constructed by default, leading to non-sequential payload locations
461    /// (permutated by `interleave()` Arrow kernel). A single payload buffer might
462    /// be shared by multiple `RecordBatch`es.
463    /// When writing each batch to disk, the writer has to write all referenced buffers,
464    /// because they have to be read back one by one to reduce memory usage. This
465    /// causes extra disk reads and writes, and potentially execution failure.
466    ///
467    /// # Example
468    /// Before sorting:
469    /// batch1 -> buffer1
470    /// batch2 -> buffer2
471    ///
472    /// sorted_batch1 -> buffer1
473    ///               -> buffer2
474    /// sorted_batch2 -> buffer1
475    ///               -> buffer2
476    ///
477    /// Then when spilling each batch, the writer has to write all referenced buffers
478    /// repeatedly.
479    fn organize_stringview_arrays(
480        globally_sorted_batches: &mut Vec<RecordBatch>,
481    ) -> Result<()> {
482        let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
483
484        for batch in globally_sorted_batches.drain(..) {
485            let mut new_columns: Vec<Arc<dyn Array>> =
486                Vec::with_capacity(batch.num_columns());
487
488            let mut arr_mutated = false;
489            for array in batch.columns() {
490                if let Some(string_view_array) =
491                    array.as_any().downcast_ref::<StringViewArray>()
492                {
493                    let new_array = string_view_array.gc();
494                    new_columns.push(Arc::new(new_array));
495                    arr_mutated = true;
496                } else {
497                    new_columns.push(Arc::clone(array));
498                }
499            }
500
501            let organized_batch = if arr_mutated {
502                RecordBatch::try_new(batch.schema(), new_columns)?
503            } else {
504                batch
505            };
506
507            organized_batches.push(organized_batch);
508        }
509
510        *globally_sorted_batches = organized_batches;
511
512        Ok(())
513    }
514
515    /// Sorts the in-memory batches and merges them into a single sorted run, then writes
516    /// the result to spill files.
517    async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
518        assert_or_internal_err!(
519            !self.in_mem_batches.is_empty(),
520            "in_mem_batches must not be empty when attempting to sort and spill"
521        );
522
523        // Release the memory reserved for merge back to the pool so
524        // there is some left when `in_mem_sort_stream` requests an
525        // allocation. At the end of this function, memory will be
526        // reserved again for the next spill.
527        self.merge_reservation.free();
528
529        let mut sorted_stream =
530            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
531        // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
532        // to construct a globally sorted stream.
533        assert_or_internal_err!(
534            self.in_mem_batches.is_empty(),
535            "in_mem_batches should be empty after constructing sorted stream"
536        );
537        // 'global' here refers to all buffered batches when the memory limit is
538        // reached. This variable will buffer the sorted batches after
539        // sort-preserving merge and incrementally append to spill files.
540        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
541
542        while let Some(batch) = sorted_stream.next().await {
543            let batch = batch?;
544            let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
545            if self.reservation.try_grow(sorted_size).is_err() {
546                // Although the reservation is not enough, the batch is
547                // already in memory, so it's okay to combine it with previously
548                // sorted batches, and spill together.
549                globally_sorted_batches.push(batch);
550                self.consume_and_spill_append(&mut globally_sorted_batches)
551                    .await?; // reservation is freed in spill()
552            } else {
553                globally_sorted_batches.push(batch);
554            }
555        }
556
557        // Drop early to free up memory reserved by the sorted stream, otherwise the
558        // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
559        drop(sorted_stream);
560
561        self.consume_and_spill_append(&mut globally_sorted_batches)
562            .await?;
563        self.spill_finish().await?;
564
565        // Sanity check after spilling
566        let buffers_cleared_property =
567            self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
568        assert_or_internal_err!(
569            buffers_cleared_property,
570            "in_mem_batches and globally_sorted_batches should be cleared before"
571        );
572
573        // Reserve headroom for next sort/merge
574        self.reserve_memory_for_merge()?;
575
576        Ok(())
577    }
578
579    /// Consumes in_mem_batches returning a sorted stream of
580    /// batches. This proceeds in one of two ways:
581    ///
582    /// # Small Datasets
583    ///
584    /// For "smaller" datasets, the data is first concatenated into a
585    /// single batch and then sorted. This is often faster than
586    /// sorting and then merging.
587    ///
588    /// ```text
589    ///        ┌─────┐
590    ///        │  2  │
591    ///        │  3  │
592    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
593    ///        │  4  │                     │  2  │
594    ///        │  2  │        │            │  3  │
595    ///        └─────┘                     │  1  │             sorted output
596    ///        ┌─────┐        ▼            │  4  │                stream
597    ///        │  1  │                     │  2  │
598    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
599    ///        │  1  │                     │  4  │
600    ///        └─────┘        ▲            │  1  │
601    ///          ...          │            │ ... │
602    ///                                    │  4  │
603    ///        ┌─────┐        │            │  3  │
604    ///        │  4  │                     └─────┘
605    ///        │  3  │─ ─ ─ ─ ┘
606    ///        └─────┘
607    ///     in_mem_batches
608    /// ```
609    ///
610    /// # Larger datasets
611    ///
612    /// For larger datasets, the batches are first sorted individually
613    /// and then merged together.
614    ///
615    /// ```text
616    ///      ┌─────┐                ┌─────┐
617    ///      │  2  │                │  1  │
618    ///      │  3  │                │  2  │
619    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
620    ///      │  4  │                │  3  │
621    ///      │  2  │                │  4  │          │
622    ///      └─────┘                └─────┘               sorted output
623    ///      ┌─────┐                ┌─────┐          ▼       stream
624    ///      │  1  │                │  1  │
625    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
626    ///      │  1  │                │  4  │
627    ///      └─────┘                └─────┘          ▲
628    ///        ...       ...         ...             │
629    ///
630    ///      ┌─────┐                ┌─────┐          │
631    ///      │  4  │                │  3  │
632    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
633    ///      └─────┘                └─────┘
634    ///
635    ///   in_mem_batches
636    /// ```
637    fn in_mem_sort_stream(
638        &mut self,
639        metrics: BaselineMetrics,
640    ) -> Result<SendableRecordBatchStream> {
641        if self.in_mem_batches.is_empty() {
642            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
643                &self.schema,
644            ))));
645        }
646
647        // The elapsed compute timer is updated when the value is dropped.
648        // There is no need for an explicit call to drop.
649        let elapsed_compute = metrics.elapsed_compute().clone();
650        let _timer = elapsed_compute.timer();
651
652        // Please pay attention that any operation inside of `in_mem_sort_stream` will
653        // not perform any memory reservation. This is for avoiding the need of handling
654        // reservation failure and spilling in the middle of the sort/merge. The memory
655        // space for batches produced by the resulting stream will be reserved by the
656        // consumer of the stream.
657
658        if self.in_mem_batches.len() == 1 {
659            let batch = self.in_mem_batches.swap_remove(0);
660            let reservation = self.reservation.take();
661            return self.sort_batch_stream(batch, &metrics, reservation);
662        }
663
664        // If less than sort_in_place_threshold_bytes, concatenate and sort in place
665        if self.reservation.size() < self.sort_in_place_threshold_bytes {
666            // Concatenate memory batches together and sort
667            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
668            self.in_mem_batches.clear();
669            self.reservation
670                .try_resize(get_reserved_bytes_for_record_batch(&batch)?)
671                .map_err(Self::err_with_oom_context)?;
672            let reservation = self.reservation.take();
673            return self.sort_batch_stream(batch, &metrics, reservation);
674        }
675
676        let streams = std::mem::take(&mut self.in_mem_batches)
677            .into_iter()
678            .map(|batch| {
679                let metrics = self.metrics.baseline.intermediate();
680                let reservation = self
681                    .reservation
682                    .split(get_reserved_bytes_for_record_batch(&batch)?);
683                let input = self.sort_batch_stream(batch, &metrics, reservation)?;
684                Ok(spawn_buffered(input, 1))
685            })
686            .collect::<Result<_>>()?;
687
688        StreamingMergeBuilder::new()
689            .with_streams(streams)
690            .with_schema(Arc::clone(&self.schema))
691            .with_expressions(&self.expr.clone())
692            .with_metrics(metrics)
693            .with_batch_size(self.batch_size)
694            .with_fetch(None)
695            .with_reservation(self.merge_reservation.new_empty())
696            .build()
697    }
698
699    /// Sorts a single `RecordBatch` into a single stream.
700    ///
701    /// This may output multiple batches depending on the size of the
702    /// sorted data and the target batch size.
703    /// For single-batch output cases, `reservation` will be freed immediately after sorting,
704    /// as the batch will be output and is expected to be reserved by the consumer of the stream.
705    /// For multi-batch output cases, `reservation` will be grown to match the actual
706    /// size of sorted output, and as each batch is output, its memory will be freed from the reservation.
707    /// (This leads to the same behaviour, as futures are only evaluated when polled by the consumer.)
708    fn sort_batch_stream(
709        &self,
710        batch: RecordBatch,
711        metrics: &BaselineMetrics,
712        mut reservation: MemoryReservation,
713    ) -> Result<SendableRecordBatchStream> {
714        assert_eq!(
715            get_reserved_bytes_for_record_batch(&batch)?,
716            reservation.size()
717        );
718
719        let schema = batch.schema();
720        let expressions = self.expr.clone();
721        let batch_size = self.batch_size;
722        let output_row_metrics = metrics.output_rows().clone();
723
724        let stream = futures::stream::once(async move {
725            let schema = batch.schema();
726
727            // Sort the batch immediately and get all output batches
728            let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
729            drop(batch);
730
731            // Free the old reservation and grow it to match the actual sorted output size
732            reservation.free();
733
734            Result::<_, DataFusionError>::Ok((schema, sorted_batches, reservation))
735        })
736        .then({
737            move |batches| async move {
738                match batches {
739                    Ok((schema, sorted_batches, mut reservation)) => {
740                        // Calculate the total size of sorted batches
741                        let total_sorted_size: usize = sorted_batches
742                            .iter()
743                            .map(get_record_batch_memory_size)
744                            .sum();
745                        reservation
746                            .try_grow(total_sorted_size)
747                            .map_err(Self::err_with_oom_context)?;
748
749                        // Wrap in ReservationStream to hold the reservation
750                        Ok(Box::pin(ReservationStream::new(
751                            Arc::clone(&schema),
752                            Box::pin(RecordBatchStreamAdapter::new(
753                                schema,
754                                futures::stream::iter(sorted_batches.into_iter().map(Ok)),
755                            )),
756                            reservation,
757                        )) as SendableRecordBatchStream)
758                    }
759                    Err(e) => Err(e),
760                }
761            }
762        })
763        .try_flatten()
764        .map(move |batch| match batch {
765            Ok(batch) => {
766                output_row_metrics.add(batch.num_rows());
767                Ok(batch)
768            }
769            Err(e) => Err(e),
770        });
771
772        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
773    }
774
775    /// If this sort may spill, pre-allocates
776    /// `sort_spill_reservation_bytes` of memory to guarantee memory
777    /// left for the in memory sort/merge.
778    fn reserve_memory_for_merge(&mut self) -> Result<()> {
779        // Reserve headroom for next merge sort
780        if self.runtime.disk_manager.tmp_files_enabled() {
781            let size = self.sort_spill_reservation_bytes;
782            if self.merge_reservation.size() != size {
783                self.merge_reservation
784                    .try_resize(size)
785                    .map_err(Self::err_with_oom_context)?;
786            }
787        }
788
789        Ok(())
790    }
791
792    /// Reserves memory to be able to accommodate the given batch.
793    /// If memory is scarce, tries to spill current in-memory batches to disk first.
794    async fn reserve_memory_for_batch_and_maybe_spill(
795        &mut self,
796        input: &RecordBatch,
797    ) -> Result<()> {
798        let size = get_reserved_bytes_for_record_batch(input)?;
799
800        match self.reservation.try_grow(size) {
801            Ok(_) => Ok(()),
802            Err(e) => {
803                if self.in_mem_batches.is_empty() {
804                    return Err(Self::err_with_oom_context(e));
805                }
806
807                // Spill and try again.
808                self.sort_and_spill_in_mem_batches().await?;
809                self.reservation
810                    .try_grow(size)
811                    .map_err(Self::err_with_oom_context)
812            }
813        }
814    }
815
816    /// Wraps the error with a context message suggesting settings to tweak.
817    /// This is meant to be used with DataFusionError::ResourcesExhausted only.
818    fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
819        match e {
820            DataFusionError::ResourcesExhausted(_) => e.context(
821                "Not enough memory to continue external sort. \
822                    Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
823            ),
824            // This is not an OOM error, so just return it as is.
825            _ => e,
826        }
827    }
828}
829
830/// Estimate how much memory is needed to sort a `RecordBatch`.
831///
832/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
833/// creating sorted copies of sorted columns in record batches for speeding up comparison
834/// in sorting and merging. The sorted copies are in either row format or array format.
835/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
836/// sorted copies are, they will use more memory than the original record batch.
837///
838/// This can basically be calculated as the sum of the actual space it takes in
839/// memory (which would be larger for a sliced batch), and the size of the actual data.
840pub(crate) fn get_reserved_bytes_for_record_batch_size(
841    record_batch_size: usize,
842    sliced_size: usize,
843) -> usize {
844    // Even 2x may not be enough for some cases, but it's a good enough estimation as a baseline.
845    // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
846    // to compensate for the extra memory needed.
847    record_batch_size + sliced_size
848}
849
850/// Estimate how much memory is needed to sort a `RecordBatch`.
851/// This will just call `get_reserved_bytes_for_record_batch_size` with the
852/// memory size of the record batch and its sliced size.
853pub(super) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result<usize> {
854    Ok(get_reserved_bytes_for_record_batch_size(
855        get_record_batch_memory_size(batch),
856        batch.get_sliced_size()?,
857    ))
858}
859
860impl Debug for ExternalSorter {
861    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
862        f.debug_struct("ExternalSorter")
863            .field("memory_used", &self.used())
864            .field("spilled_bytes", &self.spilled_bytes())
865            .field("spilled_rows", &self.spilled_rows())
866            .field("spill_count", &self.spill_count())
867            .finish()
868    }
869}
870
871pub fn sort_batch(
872    batch: &RecordBatch,
873    expressions: &LexOrdering,
874    fetch: Option<usize>,
875) -> Result<RecordBatch> {
876    let sort_columns = expressions
877        .iter()
878        .map(|expr| expr.evaluate_to_sort_column(batch))
879        .collect::<Result<Vec<_>>>()?;
880
881    let indices = lexsort_to_indices(&sort_columns, fetch)?;
882    let columns = take_arrays(batch.columns(), &indices, None)?;
883
884    let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
885    Ok(RecordBatch::try_new_with_options(
886        batch.schema(),
887        columns,
888        &options,
889    )?)
890}
891
892/// Sort a batch and return the result as multiple batches of size `batch_size`.
893/// This is useful when you want to avoid creating one large sorted batch in memory,
894/// and instead want to process the sorted data in smaller chunks.
895pub fn sort_batch_chunked(
896    batch: &RecordBatch,
897    expressions: &LexOrdering,
898    batch_size: usize,
899) -> Result<Vec<RecordBatch>> {
900    let sort_columns = expressions
901        .iter()
902        .map(|expr| expr.evaluate_to_sort_column(batch))
903        .collect::<Result<Vec<_>>>()?;
904
905    let indices = lexsort_to_indices(&sort_columns, None)?;
906
907    // Split indices into chunks of batch_size
908    let num_rows = indices.len();
909    let num_chunks = num_rows.div_ceil(batch_size);
910
911    let result_batches = (0..num_chunks)
912        .map(|chunk_idx| {
913            let start = chunk_idx * batch_size;
914            let end = (start + batch_size).min(num_rows);
915            let chunk_len = end - start;
916
917            // Create a slice of indices for this chunk
918            let chunk_indices = indices.slice(start, chunk_len);
919
920            // Take the columns using this chunk of indices
921            let columns = take_arrays(batch.columns(), &chunk_indices, None)?;
922
923            let options = RecordBatchOptions::new().with_row_count(Some(chunk_len));
924            let chunk_batch =
925                RecordBatch::try_new_with_options(batch.schema(), columns, &options)?;
926
927            Ok(chunk_batch)
928        })
929        .collect::<Result<Vec<RecordBatch>>>()?;
930
931    Ok(result_batches)
932}
933
934/// Sort execution plan.
935///
936/// Support sorting datasets that are larger than the memory allotted
937/// by the memory manager, by spilling to disk.
938#[derive(Debug, Clone)]
939pub struct SortExec {
940    /// Input schema
941    pub(crate) input: Arc<dyn ExecutionPlan>,
942    /// Sort expressions
943    expr: LexOrdering,
944    /// Containing all metrics set created during sort
945    metrics_set: ExecutionPlanMetricsSet,
946    /// Preserve partitions of input plan. If false, the input partitions
947    /// will be sorted and merged into a single output partition.
948    preserve_partitioning: bool,
949    /// Fetch highest/lowest n results
950    fetch: Option<usize>,
951    /// Normalized common sort prefix between the input and the sort expressions (only used with fetch)
952    common_sort_prefix: Vec<PhysicalSortExpr>,
953    /// Cache holding plan properties like equivalences, output partitioning etc.
954    cache: PlanProperties,
955    /// Filter matching the state of the sort for dynamic filter pushdown.
956    /// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
957    /// If `fetch` is `None`, this will be `None`.
958    filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
959}
960
961impl SortExec {
962    /// Create a new sort execution plan that produces a single,
963    /// sorted output partition.
964    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
965        let preserve_partitioning = false;
966        let (cache, sort_prefix) =
967            Self::compute_properties(&input, expr.clone(), preserve_partitioning)
968                .unwrap();
969        Self {
970            expr,
971            input,
972            metrics_set: ExecutionPlanMetricsSet::new(),
973            preserve_partitioning,
974            fetch: None,
975            common_sort_prefix: sort_prefix,
976            cache,
977            filter: None,
978        }
979    }
980
981    /// Whether this `SortExec` preserves partitioning of the children
982    pub fn preserve_partitioning(&self) -> bool {
983        self.preserve_partitioning
984    }
985
986    /// Specify the partitioning behavior of this sort exec
987    ///
988    /// If `preserve_partitioning` is true, sorts each partition
989    /// individually, producing one sorted stream for each input partition.
990    ///
991    /// If `preserve_partitioning` is false, sorts and merges all
992    /// input partitions producing a single, sorted partition.
993    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
994        self.preserve_partitioning = preserve_partitioning;
995        self.cache = self
996            .cache
997            .with_partitioning(Self::output_partitioning_helper(
998                &self.input,
999                self.preserve_partitioning,
1000            ));
1001        self
1002    }
1003
1004    /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
1005    fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
1006        let children = self
1007            .expr
1008            .iter()
1009            .map(|sort_expr| Arc::clone(&sort_expr.expr))
1010            .collect::<Vec<_>>();
1011        Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1012            DynamicFilterPhysicalExpr::new(children, lit(true)),
1013        ))))
1014    }
1015
1016    fn cloned(&self) -> Self {
1017        SortExec {
1018            input: Arc::clone(&self.input),
1019            expr: self.expr.clone(),
1020            metrics_set: self.metrics_set.clone(),
1021            preserve_partitioning: self.preserve_partitioning,
1022            common_sort_prefix: self.common_sort_prefix.clone(),
1023            fetch: self.fetch,
1024            cache: self.cache.clone(),
1025            filter: self.filter.clone(),
1026        }
1027    }
1028
1029    /// Modify how many rows to include in the result
1030    ///
1031    /// If None, then all rows will be returned, in sorted order.
1032    /// If Some, then only the top `fetch` rows will be returned.
1033    /// This can reduce the memory pressure required by the sort
1034    /// operation since rows that are not going to be included
1035    /// can be dropped.
1036    pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
1037        let mut cache = self.cache.clone();
1038        // If the SortExec can emit incrementally (that means the sort requirements
1039        // and properties of the input match), the SortExec can generate its result
1040        // without scanning the entire input when a fetch value exists.
1041        let is_pipeline_friendly = matches!(
1042            self.cache.emission_type,
1043            EmissionType::Incremental | EmissionType::Both
1044        );
1045        if fetch.is_some() && is_pipeline_friendly {
1046            cache = cache.with_boundedness(Boundedness::Bounded);
1047        }
1048        let filter = fetch.is_some().then(|| {
1049            // If we already have a filter, keep it. Otherwise, create a new one.
1050            self.filter.clone().unwrap_or_else(|| self.create_filter())
1051        });
1052        let mut new_sort = self.cloned();
1053        new_sort.fetch = fetch;
1054        new_sort.cache = cache;
1055        new_sort.filter = filter;
1056        new_sort
1057    }
1058
1059    /// Input schema
1060    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1061        &self.input
1062    }
1063
1064    /// Sort expressions
1065    pub fn expr(&self) -> &LexOrdering {
1066        &self.expr
1067    }
1068
1069    /// If `Some(fetch)`, limits output to only the first "fetch" items
1070    pub fn fetch(&self) -> Option<usize> {
1071        self.fetch
1072    }
1073
1074    fn output_partitioning_helper(
1075        input: &Arc<dyn ExecutionPlan>,
1076        preserve_partitioning: bool,
1077    ) -> Partitioning {
1078        // Get output partitioning:
1079        if preserve_partitioning {
1080            input.output_partitioning().clone()
1081        } else {
1082            Partitioning::UnknownPartitioning(1)
1083        }
1084    }
1085
1086    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
1087    /// It also returns the common sort prefix between the input and the sort expressions.
1088    fn compute_properties(
1089        input: &Arc<dyn ExecutionPlan>,
1090        sort_exprs: LexOrdering,
1091        preserve_partitioning: bool,
1092    ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1093        let (sort_prefix, sort_satisfied) = input
1094            .equivalence_properties()
1095            .extract_common_sort_prefix(sort_exprs.clone())?;
1096
1097        // The emission type depends on whether the input is already sorted:
1098        // - If already fully sorted, we can emit results in the same way as the input
1099        // - If not sorted, we must wait until all data is processed to emit results (Final)
1100        let emission_type = if sort_satisfied {
1101            input.pipeline_behavior()
1102        } else {
1103            EmissionType::Final
1104        };
1105
1106        // The boundedness depends on whether the input is already sorted:
1107        // - If already sorted, we have the same property as the input
1108        // - If not sorted and input is unbounded, we require infinite memory and generates
1109        //   unbounded data (not practical).
1110        // - If not sorted and input is bounded, then the SortExec is bounded, too.
1111        let boundedness = if sort_satisfied {
1112            input.boundedness()
1113        } else {
1114            match input.boundedness() {
1115                Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1116                    requires_infinite_memory: true,
1117                },
1118                bounded => bounded,
1119            }
1120        };
1121
1122        // Calculate equivalence properties; i.e. reset the ordering equivalence
1123        // class with the new ordering:
1124        let mut eq_properties = input.equivalence_properties().clone();
1125        eq_properties.reorder(sort_exprs)?;
1126
1127        // Get output partitioning:
1128        let output_partitioning =
1129            Self::output_partitioning_helper(input, preserve_partitioning);
1130
1131        Ok((
1132            PlanProperties::new(
1133                eq_properties,
1134                output_partitioning,
1135                emission_type,
1136                boundedness,
1137            ),
1138            sort_prefix,
1139        ))
1140    }
1141}
1142
1143impl DisplayAs for SortExec {
1144    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1145        match t {
1146            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1147                let preserve_partitioning = self.preserve_partitioning;
1148                match self.fetch {
1149                    Some(fetch) => {
1150                        write!(
1151                            f,
1152                            "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1153                            self.expr
1154                        )?;
1155                        if let Some(filter) = &self.filter
1156                            && let Ok(current) = filter.read().expr().current()
1157                            && !current.eq(&lit(true))
1158                        {
1159                            write!(f, ", filter=[{current}]")?;
1160                        }
1161                        if !self.common_sort_prefix.is_empty() {
1162                            write!(f, ", sort_prefix=[")?;
1163                            let mut first = true;
1164                            for sort_expr in &self.common_sort_prefix {
1165                                if first {
1166                                    first = false;
1167                                } else {
1168                                    write!(f, ", ")?;
1169                                }
1170                                write!(f, "{sort_expr}")?;
1171                            }
1172                            write!(f, "]")
1173                        } else {
1174                            Ok(())
1175                        }
1176                    }
1177                    None => write!(
1178                        f,
1179                        "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1180                        self.expr
1181                    ),
1182                }
1183            }
1184            DisplayFormatType::TreeRender => match self.fetch {
1185                Some(fetch) => {
1186                    writeln!(f, "{}", self.expr)?;
1187                    writeln!(f, "limit={fetch}")
1188                }
1189                None => {
1190                    writeln!(f, "{}", self.expr)
1191                }
1192            },
1193        }
1194    }
1195}
1196
1197impl ExecutionPlan for SortExec {
1198    fn name(&self) -> &'static str {
1199        match self.fetch {
1200            Some(_) => "SortExec(TopK)",
1201            None => "SortExec",
1202        }
1203    }
1204
1205    fn as_any(&self) -> &dyn Any {
1206        self
1207    }
1208
1209    fn properties(&self) -> &PlanProperties {
1210        &self.cache
1211    }
1212
1213    fn required_input_distribution(&self) -> Vec<Distribution> {
1214        if self.preserve_partitioning {
1215            vec![Distribution::UnspecifiedDistribution]
1216        } else {
1217            // global sort
1218            // TODO support RangePartition and OrderedDistribution
1219            vec![Distribution::SinglePartition]
1220        }
1221    }
1222
1223    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1224        vec![&self.input]
1225    }
1226
1227    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1228        vec![false]
1229    }
1230
1231    fn with_new_children(
1232        self: Arc<Self>,
1233        children: Vec<Arc<dyn ExecutionPlan>>,
1234    ) -> Result<Arc<dyn ExecutionPlan>> {
1235        let mut new_sort = self.cloned();
1236        assert_eq!(children.len(), 1, "SortExec should have exactly one child");
1237        new_sort.input = Arc::clone(&children[0]);
1238        // Recompute the properties based on the new input since they may have changed
1239        let (cache, sort_prefix) = Self::compute_properties(
1240            &new_sort.input,
1241            new_sort.expr.clone(),
1242            new_sort.preserve_partitioning,
1243        )?;
1244        new_sort.cache = cache;
1245        new_sort.common_sort_prefix = sort_prefix;
1246
1247        Ok(Arc::new(new_sort))
1248    }
1249
1250    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1251        let children = self.children().into_iter().cloned().collect();
1252        let new_sort = self.with_new_children(children)?;
1253        let mut new_sort = new_sort
1254            .as_any()
1255            .downcast_ref::<SortExec>()
1256            .expect("cloned 1 lines above this line, we know the type")
1257            .clone();
1258        // Our dynamic filter and execution metrics are the state we need to reset.
1259        new_sort.filter = Some(new_sort.create_filter());
1260        new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1261
1262        Ok(Arc::new(new_sort))
1263    }
1264
1265    fn execute(
1266        &self,
1267        partition: usize,
1268        context: Arc<TaskContext>,
1269    ) -> Result<SendableRecordBatchStream> {
1270        trace!(
1271            "Start SortExec::execute for partition {} of context session_id {} and task_id {:?}",
1272            partition,
1273            context.session_id(),
1274            context.task_id()
1275        );
1276
1277        let mut input = self.input.execute(partition, Arc::clone(&context))?;
1278
1279        let execution_options = &context.session_config().options().execution;
1280
1281        trace!("End SortExec's input.execute for partition: {partition}");
1282
1283        let sort_satisfied = self
1284            .input
1285            .equivalence_properties()
1286            .ordering_satisfy(self.expr.clone())?;
1287
1288        match (sort_satisfied, self.fetch.as_ref()) {
1289            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1290                input,
1291                0,
1292                Some(*fetch),
1293                BaselineMetrics::new(&self.metrics_set, partition),
1294            ))),
1295            (true, None) => Ok(input),
1296            (false, Some(fetch)) => {
1297                let filter = self.filter.clone();
1298                let mut topk = TopK::try_new(
1299                    partition,
1300                    input.schema(),
1301                    self.common_sort_prefix.clone(),
1302                    self.expr.clone(),
1303                    *fetch,
1304                    context.session_config().batch_size(),
1305                    context.runtime_env(),
1306                    &self.metrics_set,
1307                    Arc::clone(&unwrap_or_internal_err!(filter)),
1308                )?;
1309                Ok(Box::pin(RecordBatchStreamAdapter::new(
1310                    self.schema(),
1311                    futures::stream::once(async move {
1312                        while let Some(batch) = input.next().await {
1313                            let batch = batch?;
1314                            topk.insert_batch(batch)?;
1315                            if topk.finished {
1316                                break;
1317                            }
1318                        }
1319                        topk.emit()
1320                    })
1321                    .try_flatten(),
1322                )))
1323            }
1324            (false, None) => {
1325                let mut sorter = ExternalSorter::new(
1326                    partition,
1327                    input.schema(),
1328                    self.expr.clone(),
1329                    context.session_config().batch_size(),
1330                    execution_options.sort_spill_reservation_bytes,
1331                    execution_options.sort_in_place_threshold_bytes,
1332                    context.session_config().spill_compression(),
1333                    &self.metrics_set,
1334                    context.runtime_env(),
1335                )?;
1336                Ok(Box::pin(RecordBatchStreamAdapter::new(
1337                    self.schema(),
1338                    futures::stream::once(async move {
1339                        while let Some(batch) = input.next().await {
1340                            let batch = batch?;
1341                            sorter.insert_batch(batch).await?;
1342                        }
1343                        sorter.sort().await
1344                    })
1345                    .try_flatten(),
1346                )))
1347            }
1348        }
1349    }
1350
1351    fn metrics(&self) -> Option<MetricsSet> {
1352        Some(self.metrics_set.clone_inner())
1353    }
1354
1355    fn statistics(&self) -> Result<Statistics> {
1356        self.partition_statistics(None)
1357    }
1358
1359    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1360        if !self.preserve_partitioning() {
1361            return self
1362                .input
1363                .partition_statistics(None)?
1364                .with_fetch(self.fetch, 0, 1);
1365        }
1366        self.input
1367            .partition_statistics(partition)?
1368            .with_fetch(self.fetch, 0, 1)
1369    }
1370
1371    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1372        Some(Arc::new(SortExec::with_fetch(self, limit)))
1373    }
1374
1375    fn fetch(&self) -> Option<usize> {
1376        self.fetch
1377    }
1378
1379    fn cardinality_effect(&self) -> CardinalityEffect {
1380        if self.fetch.is_none() {
1381            CardinalityEffect::Equal
1382        } else {
1383            CardinalityEffect::LowerEqual
1384        }
1385    }
1386
1387    /// Tries to swap the projection with its input [`SortExec`]. If it can be done,
1388    /// it returns the new swapped version having the [`SortExec`] as the top plan.
1389    /// Otherwise, it returns None.
1390    fn try_swapping_with_projection(
1391        &self,
1392        projection: &ProjectionExec,
1393    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1394        // If the projection does not narrow the schema, we should not try to push it down.
1395        if projection.expr().len() >= projection.input().schema().fields().len() {
1396            return Ok(None);
1397        }
1398
1399        let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1400        else {
1401            return Ok(None);
1402        };
1403
1404        Ok(Some(Arc::new(
1405            SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1406                .with_fetch(self.fetch())
1407                .with_preserve_partitioning(self.preserve_partitioning()),
1408        )))
1409    }
1410
1411    fn gather_filters_for_pushdown(
1412        &self,
1413        phase: FilterPushdownPhase,
1414        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1415        config: &datafusion_common::config::ConfigOptions,
1416    ) -> Result<FilterDescription> {
1417        if !matches!(phase, FilterPushdownPhase::Post) {
1418            return FilterDescription::from_children(parent_filters, &self.children());
1419        }
1420
1421        let mut child =
1422            ChildFilterDescription::from_child(&parent_filters, self.input())?;
1423
1424        if let Some(filter) = &self.filter
1425            && config.optimizer.enable_topk_dynamic_filter_pushdown
1426        {
1427            child = child.with_self_filter(filter.read().expr());
1428        }
1429
1430        Ok(FilterDescription::new().with_child(child))
1431    }
1432}
1433
1434#[cfg(test)]
1435mod tests {
1436    use std::collections::HashMap;
1437    use std::pin::Pin;
1438    use std::task::{Context, Poll};
1439
1440    use super::*;
1441    use crate::coalesce_partitions::CoalescePartitionsExec;
1442    use crate::collect;
1443    use crate::execution_plan::Boundedness;
1444    use crate::expressions::col;
1445    use crate::test;
1446    use crate::test::TestMemoryExec;
1447    use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
1448    use crate::test::{assert_is_pending, make_partition};
1449
1450    use arrow::array::*;
1451    use arrow::compute::SortOptions;
1452    use arrow::datatypes::*;
1453    use datafusion_common::cast::as_primitive_array;
1454    use datafusion_common::test_util::batches_to_string;
1455    use datafusion_common::{DataFusionError, Result, ScalarValue};
1456    use datafusion_execution::RecordBatchStream;
1457    use datafusion_execution::config::SessionConfig;
1458    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1459    use datafusion_physical_expr::EquivalenceProperties;
1460    use datafusion_physical_expr::expressions::{Column, Literal};
1461
1462    use futures::{FutureExt, Stream};
1463    use insta::assert_snapshot;
1464
1465    #[derive(Debug, Clone)]
1466    pub struct SortedUnboundedExec {
1467        schema: Schema,
1468        batch_size: u64,
1469        cache: PlanProperties,
1470    }
1471
1472    impl DisplayAs for SortedUnboundedExec {
1473        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1474            match t {
1475                DisplayFormatType::Default
1476                | DisplayFormatType::Verbose
1477                | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1478            }
1479            Ok(())
1480        }
1481    }
1482
1483    impl SortedUnboundedExec {
1484        fn compute_properties(schema: SchemaRef) -> PlanProperties {
1485            let mut eq_properties = EquivalenceProperties::new(schema);
1486            eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1487                Column::new("c1", 0),
1488            ))]);
1489            PlanProperties::new(
1490                eq_properties,
1491                Partitioning::UnknownPartitioning(1),
1492                EmissionType::Final,
1493                Boundedness::Unbounded {
1494                    requires_infinite_memory: false,
1495                },
1496            )
1497        }
1498    }
1499
1500    impl ExecutionPlan for SortedUnboundedExec {
1501        fn name(&self) -> &'static str {
1502            Self::static_name()
1503        }
1504
1505        fn as_any(&self) -> &dyn Any {
1506            self
1507        }
1508
1509        fn properties(&self) -> &PlanProperties {
1510            &self.cache
1511        }
1512
1513        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1514            vec![]
1515        }
1516
1517        fn with_new_children(
1518            self: Arc<Self>,
1519            _: Vec<Arc<dyn ExecutionPlan>>,
1520        ) -> Result<Arc<dyn ExecutionPlan>> {
1521            Ok(self)
1522        }
1523
1524        fn execute(
1525            &self,
1526            _partition: usize,
1527            _context: Arc<TaskContext>,
1528        ) -> Result<SendableRecordBatchStream> {
1529            Ok(Box::pin(SortedUnboundedStream {
1530                schema: Arc::new(self.schema.clone()),
1531                batch_size: self.batch_size,
1532                offset: 0,
1533            }))
1534        }
1535    }
1536
1537    #[derive(Debug)]
1538    pub struct SortedUnboundedStream {
1539        schema: SchemaRef,
1540        batch_size: u64,
1541        offset: u64,
1542    }
1543
1544    impl Stream for SortedUnboundedStream {
1545        type Item = Result<RecordBatch>;
1546
1547        fn poll_next(
1548            mut self: Pin<&mut Self>,
1549            _cx: &mut Context<'_>,
1550        ) -> Poll<Option<Self::Item>> {
1551            let batch = SortedUnboundedStream::create_record_batch(
1552                Arc::clone(&self.schema),
1553                self.offset,
1554                self.batch_size,
1555            );
1556            self.offset += self.batch_size;
1557            Poll::Ready(Some(Ok(batch)))
1558        }
1559    }
1560
1561    impl RecordBatchStream for SortedUnboundedStream {
1562        fn schema(&self) -> SchemaRef {
1563            Arc::clone(&self.schema)
1564        }
1565    }
1566
1567    impl SortedUnboundedStream {
1568        fn create_record_batch(
1569            schema: SchemaRef,
1570            offset: u64,
1571            batch_size: u64,
1572        ) -> RecordBatch {
1573            let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1574            let array = UInt64Array::from(values);
1575            let array_ref: ArrayRef = Arc::new(array);
1576            RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1577        }
1578    }
1579
1580    #[tokio::test]
1581    async fn test_in_mem_sort() -> Result<()> {
1582        let task_ctx = Arc::new(TaskContext::default());
1583        let partitions = 4;
1584        let csv = test::scan_partitioned(partitions);
1585        let schema = csv.schema();
1586
1587        let sort_exec = Arc::new(SortExec::new(
1588            [PhysicalSortExpr {
1589                expr: col("i", &schema)?,
1590                options: SortOptions::default(),
1591            }]
1592            .into(),
1593            Arc::new(CoalescePartitionsExec::new(csv)),
1594        ));
1595
1596        let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1597
1598        assert_eq!(result.len(), 1);
1599        assert_eq!(result[0].num_rows(), 400);
1600        assert_eq!(
1601            task_ctx.runtime_env().memory_pool.reserved(),
1602            0,
1603            "The sort should have returned all memory used back to the memory manager"
1604        );
1605
1606        Ok(())
1607    }
1608
1609    #[tokio::test]
1610    async fn test_sort_spill() -> Result<()> {
1611        // trigger spill w/ 100 batches
1612        let session_config = SessionConfig::new();
1613        let sort_spill_reservation_bytes = session_config
1614            .options()
1615            .execution
1616            .sort_spill_reservation_bytes;
1617        let runtime = RuntimeEnvBuilder::new()
1618            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1619            .build_arc()?;
1620        let task_ctx = Arc::new(
1621            TaskContext::default()
1622                .with_session_config(session_config)
1623                .with_runtime(runtime),
1624        );
1625
1626        // The input has 100 partitions, each partition has a batch containing 100 rows.
1627        // Each row has a single Int32 column with values 0..100. The total size of the
1628        // input is roughly 40000 bytes.
1629        let partitions = 100;
1630        let input = test::scan_partitioned(partitions);
1631        let schema = input.schema();
1632
1633        let sort_exec = Arc::new(SortExec::new(
1634            [PhysicalSortExpr {
1635                expr: col("i", &schema)?,
1636                options: SortOptions::default(),
1637            }]
1638            .into(),
1639            Arc::new(CoalescePartitionsExec::new(input)),
1640        ));
1641
1642        let result = collect(
1643            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1644            Arc::clone(&task_ctx),
1645        )
1646        .await?;
1647
1648        assert_eq!(result.len(), 2);
1649
1650        // Now, validate metrics
1651        let metrics = sort_exec.metrics().unwrap();
1652
1653        assert_eq!(metrics.output_rows().unwrap(), 10000);
1654        assert!(metrics.elapsed_compute().unwrap() > 0);
1655
1656        let spill_count = metrics.spill_count().unwrap();
1657        let spilled_rows = metrics.spilled_rows().unwrap();
1658        let spilled_bytes = metrics.spilled_bytes().unwrap();
1659        // Processing 40000 bytes of data using 12288 bytes of memory requires 3 spills
1660        // unless we do something really clever. It will spill roughly 9000+ rows and 36000
1661        // bytes. We leave a little wiggle room for the actual numbers.
1662        assert!((3..=10).contains(&spill_count));
1663        assert!((9000..=10000).contains(&spilled_rows));
1664        assert!((38000..=44000).contains(&spilled_bytes));
1665
1666        let columns = result[0].columns();
1667
1668        let i = as_primitive_array::<Int32Type>(&columns[0])?;
1669        assert_eq!(i.value(0), 0);
1670        assert_eq!(i.value(i.len() - 1), 81);
1671        assert_eq!(
1672            task_ctx.runtime_env().memory_pool.reserved(),
1673            0,
1674            "The sort should have returned all memory used back to the memory manager"
1675        );
1676
1677        Ok(())
1678    }
1679
1680    #[tokio::test]
1681    async fn test_batch_reservation_error() -> Result<()> {
1682        // Pick a memory limit and sort_spill_reservation that make the first batch reservation fail.
1683        let merge_reservation: usize = 0; // Set to 0 for simplicity
1684
1685        let session_config =
1686            SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1687
1688        let plan = test::scan_partitioned(1);
1689
1690        // Read the first record batch to determine the actual memory requirement
1691        let expected_batch_reservation = {
1692            let temp_ctx = Arc::new(TaskContext::default());
1693            let mut stream = plan.execute(0, Arc::clone(&temp_ctx))?;
1694            let first_batch = stream.next().await.unwrap()?;
1695            get_reserved_bytes_for_record_batch(&first_batch)?
1696        };
1697
1698        // Set memory limit just short of what we need
1699        let memory_limit: usize = expected_batch_reservation + merge_reservation - 1;
1700
1701        let runtime = RuntimeEnvBuilder::new()
1702            .with_memory_limit(memory_limit, 1.0)
1703            .build_arc()?;
1704        let task_ctx = Arc::new(
1705            TaskContext::default()
1706                .with_session_config(session_config)
1707                .with_runtime(runtime),
1708        );
1709
1710        // Verify that our memory limit is insufficient
1711        {
1712            let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1713            let first_batch = stream.next().await.unwrap()?;
1714            let batch_reservation = get_reserved_bytes_for_record_batch(&first_batch)?;
1715
1716            assert_eq!(batch_reservation, expected_batch_reservation);
1717            assert!(memory_limit < (merge_reservation + batch_reservation));
1718        }
1719
1720        let sort_exec = Arc::new(SortExec::new(
1721            [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1722            plan,
1723        ));
1724
1725        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1726
1727        let err = result.unwrap_err();
1728        assert!(
1729            matches!(err, DataFusionError::Context(..)),
1730            "Assertion failed: expected a Context error, but got: {err:?}"
1731        );
1732
1733        // Assert that the context error is wrapping a resources exhausted error.
1734        assert!(
1735            matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1736            "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1737        );
1738
1739        Ok(())
1740    }
1741
1742    #[tokio::test]
1743    async fn test_sort_spill_utf8_strings() -> Result<()> {
1744        let session_config = SessionConfig::new()
1745            .with_batch_size(100)
1746            .with_sort_in_place_threshold_bytes(20 * 1024)
1747            .with_sort_spill_reservation_bytes(100 * 1024);
1748        let runtime = RuntimeEnvBuilder::new()
1749            .with_memory_limit(500 * 1024, 1.0)
1750            .build_arc()?;
1751        let task_ctx = Arc::new(
1752            TaskContext::default()
1753                .with_session_config(session_config)
1754                .with_runtime(runtime),
1755        );
1756
1757        // The input has 200 partitions, each partition has a batch containing 100 rows.
1758        // Each row has a single Utf8 column, the Utf8 string values are roughly 42 bytes.
1759        // The total size of the input is roughly 8.4 KB.
1760        let input = test::scan_partitioned_utf8(200);
1761        let schema = input.schema();
1762
1763        let sort_exec = Arc::new(SortExec::new(
1764            [PhysicalSortExpr {
1765                expr: col("i", &schema)?,
1766                options: SortOptions::default(),
1767            }]
1768            .into(),
1769            Arc::new(CoalescePartitionsExec::new(input)),
1770        ));
1771
1772        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1773
1774        let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1775        assert_eq!(num_rows, 20000);
1776
1777        // Now, validate metrics
1778        let metrics = sort_exec.metrics().unwrap();
1779
1780        assert_eq!(metrics.output_rows().unwrap(), 20000);
1781        assert!(metrics.elapsed_compute().unwrap() > 0);
1782
1783        let spill_count = metrics.spill_count().unwrap();
1784        let spilled_rows = metrics.spilled_rows().unwrap();
1785        let spilled_bytes = metrics.spilled_bytes().unwrap();
1786
1787        // This test case is processing 840KB of data using 400KB of memory. Note
1788        // that buffered batches can't be dropped until all sorted batches are
1789        // generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
1790        // batches.
1791        // The number of spills is roughly calculated as:
1792        //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1793
1794        // If this assertion fail with large spill count, make sure the following
1795        // case does not happen:
1796        // During external sorting, one sorted run should be spilled to disk in a
1797        // single file, due to memory limit we might need to append to the file
1798        // multiple times to spill all the data. Make sure we're not writing each
1799        // appending as a separate file.
1800        assert!((4..=8).contains(&spill_count));
1801        assert!((15000..=20000).contains(&spilled_rows));
1802        assert!((900000..=1000000).contains(&spilled_bytes));
1803
1804        // Verify that the result is sorted
1805        let concated_result = concat_batches(&schema, &result)?;
1806        let columns = concated_result.columns();
1807        let string_array = as_string_array(&columns[0]);
1808        for i in 0..string_array.len() - 1 {
1809            assert!(string_array.value(i) <= string_array.value(i + 1));
1810        }
1811
1812        assert_eq!(
1813            task_ctx.runtime_env().memory_pool.reserved(),
1814            0,
1815            "The sort should have returned all memory used back to the memory manager"
1816        );
1817
1818        Ok(())
1819    }
1820
1821    #[tokio::test]
1822    async fn test_sort_fetch_memory_calculation() -> Result<()> {
1823        // This test mirrors down the size from the example above.
1824        let avg_batch_size = 400;
1825        let partitions = 4;
1826
1827        // A tuple of (fetch, expect_spillage)
1828        let test_options = vec![
1829            // Since we don't have a limit (and the memory is less than the total size of
1830            // all the batches we are processing, we expect it to spill.
1831            (None, true),
1832            // When we have a limit however, the buffered size of batches should fit in memory
1833            // since it is much lower than the total size of the input batch.
1834            (Some(1), false),
1835        ];
1836
1837        for (fetch, expect_spillage) in test_options {
1838            let session_config = SessionConfig::new();
1839            let sort_spill_reservation_bytes = session_config
1840                .options()
1841                .execution
1842                .sort_spill_reservation_bytes;
1843
1844            let runtime = RuntimeEnvBuilder::new()
1845                .with_memory_limit(
1846                    sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1847                    1.0,
1848                )
1849                .build_arc()?;
1850            let task_ctx = Arc::new(
1851                TaskContext::default()
1852                    .with_runtime(runtime)
1853                    .with_session_config(session_config),
1854            );
1855
1856            let csv = test::scan_partitioned(partitions);
1857            let schema = csv.schema();
1858
1859            let sort_exec = Arc::new(
1860                SortExec::new(
1861                    [PhysicalSortExpr {
1862                        expr: col("i", &schema)?,
1863                        options: SortOptions::default(),
1864                    }]
1865                    .into(),
1866                    Arc::new(CoalescePartitionsExec::new(csv)),
1867                )
1868                .with_fetch(fetch),
1869            );
1870
1871            let result =
1872                collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1873            assert_eq!(result.len(), 1);
1874
1875            let metrics = sort_exec.metrics().unwrap();
1876            let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1877            assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1878        }
1879        Ok(())
1880    }
1881
1882    #[tokio::test]
1883    async fn test_sort_memory_reduction_per_batch() -> Result<()> {
1884        // This test verifies that memory reservation is reduced for every batch emitted
1885        // during the sort process. This is important to ensure we don't hold onto
1886        // memory longer than necessary.
1887
1888        // Create a large enough batch that will be split into multiple output batches
1889        let batch_size = 50; // Small batch size to force multiple output batches
1890        let num_rows = 1000; // Create enough data for multiple batches
1891
1892        let task_ctx = Arc::new(
1893            TaskContext::default().with_session_config(
1894                SessionConfig::new()
1895                    .with_batch_size(batch_size)
1896                    .with_sort_in_place_threshold_bytes(usize::MAX), // Ensure we don't concat batches
1897            ),
1898        );
1899
1900        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1901
1902        // Create unsorted data
1903        let mut values: Vec<i32> = (0..num_rows).collect();
1904        values.reverse();
1905
1906        let input_batch = RecordBatch::try_new(
1907            Arc::clone(&schema),
1908            vec![Arc::new(Int32Array::from(values))],
1909        )?;
1910
1911        let batches = vec![input_batch];
1912
1913        let sort_exec = Arc::new(SortExec::new(
1914            [PhysicalSortExpr {
1915                expr: Arc::new(Column::new("a", 0)),
1916                options: SortOptions::default(),
1917            }]
1918            .into(),
1919            TestMemoryExec::try_new_exec(
1920                std::slice::from_ref(&batches),
1921                Arc::clone(&schema),
1922                None,
1923            )?,
1924        ));
1925
1926        let mut stream = sort_exec.execute(0, Arc::clone(&task_ctx))?;
1927
1928        let mut previous_reserved = task_ctx.runtime_env().memory_pool.reserved();
1929        let mut batch_count = 0;
1930
1931        // Collect batches and verify memory is reduced with each batch
1932        while let Some(result) = stream.next().await {
1933            let batch = result?;
1934            batch_count += 1;
1935
1936            // Verify we got a non-empty batch
1937            assert!(batch.num_rows() > 0, "Batch should not be empty");
1938
1939            let current_reserved = task_ctx.runtime_env().memory_pool.reserved();
1940
1941            // After the first batch, memory should be reducing or staying the same
1942            // (it should not increase as we emit batches)
1943            if batch_count > 1 {
1944                assert!(
1945                    current_reserved <= previous_reserved,
1946                    "Memory reservation should decrease or stay same as batches are emitted. \
1947                     Batch {batch_count}: previous={previous_reserved}, current={current_reserved}"
1948                );
1949            }
1950
1951            previous_reserved = current_reserved;
1952        }
1953
1954        assert!(
1955            batch_count > 1,
1956            "Expected multiple batches to be emitted, got {batch_count}"
1957        );
1958
1959        // Verify all memory is returned at the end
1960        assert_eq!(
1961            task_ctx.runtime_env().memory_pool.reserved(),
1962            0,
1963            "All memory should be returned after consuming all batches"
1964        );
1965
1966        Ok(())
1967    }
1968
1969    #[tokio::test]
1970    async fn test_sort_metadata() -> Result<()> {
1971        let task_ctx = Arc::new(TaskContext::default());
1972        let field_metadata: HashMap<String, String> =
1973            vec![("foo".to_string(), "bar".to_string())]
1974                .into_iter()
1975                .collect();
1976        let schema_metadata: HashMap<String, String> =
1977            vec![("baz".to_string(), "barf".to_string())]
1978                .into_iter()
1979                .collect();
1980
1981        let mut field = Field::new("field_name", DataType::UInt64, true);
1982        field.set_metadata(field_metadata.clone());
1983        let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1984        let schema = Arc::new(schema);
1985
1986        let data: ArrayRef =
1987            Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1988
1989        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1990        let input =
1991            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1992
1993        let sort_exec = Arc::new(SortExec::new(
1994            [PhysicalSortExpr {
1995                expr: col("field_name", &schema)?,
1996                options: SortOptions::default(),
1997            }]
1998            .into(),
1999            input,
2000        ));
2001
2002        let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
2003
2004        let expected_data: ArrayRef =
2005            Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
2006        let expected_batch =
2007            RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
2008
2009        // Data is correct
2010        assert_eq!(&vec![expected_batch], &result);
2011
2012        // explicitly ensure the metadata is present
2013        assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
2014        assert_eq!(result[0].schema().metadata(), &schema_metadata);
2015
2016        Ok(())
2017    }
2018
2019    #[tokio::test]
2020    async fn test_lex_sort_by_mixed_types() -> Result<()> {
2021        let task_ctx = Arc::new(TaskContext::default());
2022        let schema = Arc::new(Schema::new(vec![
2023            Field::new("a", DataType::Int32, true),
2024            Field::new(
2025                "b",
2026                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2027                true,
2028            ),
2029        ]));
2030
2031        // define data.
2032        let batch = RecordBatch::try_new(
2033            Arc::clone(&schema),
2034            vec![
2035                Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
2036                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2037                    Some(vec![Some(3)]),
2038                    Some(vec![Some(1)]),
2039                    Some(vec![Some(6), None]),
2040                    Some(vec![Some(5)]),
2041                ])),
2042            ],
2043        )?;
2044
2045        let sort_exec = Arc::new(SortExec::new(
2046            [
2047                PhysicalSortExpr {
2048                    expr: col("a", &schema)?,
2049                    options: SortOptions {
2050                        descending: false,
2051                        nulls_first: true,
2052                    },
2053                },
2054                PhysicalSortExpr {
2055                    expr: col("b", &schema)?,
2056                    options: SortOptions {
2057                        descending: true,
2058                        nulls_first: false,
2059                    },
2060                },
2061            ]
2062            .into(),
2063            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
2064        ));
2065
2066        assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
2067        assert_eq!(
2068            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2069            *sort_exec.schema().field(1).data_type()
2070        );
2071
2072        let result: Vec<RecordBatch> =
2073            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2074        let metrics = sort_exec.metrics().unwrap();
2075        assert!(metrics.elapsed_compute().unwrap() > 0);
2076        assert_eq!(metrics.output_rows().unwrap(), 4);
2077        assert_eq!(result.len(), 1);
2078
2079        let expected = RecordBatch::try_new(
2080            schema,
2081            vec![
2082                Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
2083                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2084                    Some(vec![Some(1)]),
2085                    Some(vec![Some(6), None]),
2086                    Some(vec![Some(5)]),
2087                    Some(vec![Some(3)]),
2088                ])),
2089            ],
2090        )?;
2091
2092        assert_eq!(expected, result[0]);
2093
2094        Ok(())
2095    }
2096
2097    #[tokio::test]
2098    async fn test_lex_sort_by_float() -> Result<()> {
2099        let task_ctx = Arc::new(TaskContext::default());
2100        let schema = Arc::new(Schema::new(vec![
2101            Field::new("a", DataType::Float32, true),
2102            Field::new("b", DataType::Float64, true),
2103        ]));
2104
2105        // define data.
2106        let batch = RecordBatch::try_new(
2107            Arc::clone(&schema),
2108            vec![
2109                Arc::new(Float32Array::from(vec![
2110                    Some(f32::NAN),
2111                    None,
2112                    None,
2113                    Some(f32::NAN),
2114                    Some(1.0_f32),
2115                    Some(1.0_f32),
2116                    Some(2.0_f32),
2117                    Some(3.0_f32),
2118                ])),
2119                Arc::new(Float64Array::from(vec![
2120                    Some(200.0_f64),
2121                    Some(20.0_f64),
2122                    Some(10.0_f64),
2123                    Some(100.0_f64),
2124                    Some(f64::NAN),
2125                    None,
2126                    None,
2127                    Some(f64::NAN),
2128                ])),
2129            ],
2130        )?;
2131
2132        let sort_exec = Arc::new(SortExec::new(
2133            [
2134                PhysicalSortExpr {
2135                    expr: col("a", &schema)?,
2136                    options: SortOptions {
2137                        descending: true,
2138                        nulls_first: true,
2139                    },
2140                },
2141                PhysicalSortExpr {
2142                    expr: col("b", &schema)?,
2143                    options: SortOptions {
2144                        descending: false,
2145                        nulls_first: false,
2146                    },
2147                },
2148            ]
2149            .into(),
2150            TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
2151        ));
2152
2153        assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
2154        assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
2155
2156        let result: Vec<RecordBatch> =
2157            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2158        let metrics = sort_exec.metrics().unwrap();
2159        assert!(metrics.elapsed_compute().unwrap() > 0);
2160        assert_eq!(metrics.output_rows().unwrap(), 8);
2161        assert_eq!(result.len(), 1);
2162
2163        let columns = result[0].columns();
2164
2165        assert_eq!(DataType::Float32, *columns[0].data_type());
2166        assert_eq!(DataType::Float64, *columns[1].data_type());
2167
2168        let a = as_primitive_array::<Float32Type>(&columns[0])?;
2169        let b = as_primitive_array::<Float64Type>(&columns[1])?;
2170
2171        // convert result to strings to allow comparing to expected result containing NaN
2172        let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2173            .map(|i| {
2174                let aval = if a.is_valid(i) {
2175                    Some(a.value(i).to_string())
2176                } else {
2177                    None
2178                };
2179                let bval = if b.is_valid(i) {
2180                    Some(b.value(i).to_string())
2181                } else {
2182                    None
2183                };
2184                (aval, bval)
2185            })
2186            .collect();
2187
2188        let expected: Vec<(Option<String>, Option<String>)> = vec![
2189            (None, Some("10".to_owned())),
2190            (None, Some("20".to_owned())),
2191            (Some("NaN".to_owned()), Some("100".to_owned())),
2192            (Some("NaN".to_owned()), Some("200".to_owned())),
2193            (Some("3".to_owned()), Some("NaN".to_owned())),
2194            (Some("2".to_owned()), None),
2195            (Some("1".to_owned()), Some("NaN".to_owned())),
2196            (Some("1".to_owned()), None),
2197        ];
2198
2199        assert_eq!(expected, result);
2200
2201        Ok(())
2202    }
2203
2204    #[tokio::test]
2205    async fn test_drop_cancel() -> Result<()> {
2206        let task_ctx = Arc::new(TaskContext::default());
2207        let schema =
2208            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2209
2210        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2211        let refs = blocking_exec.refs();
2212        let sort_exec = Arc::new(SortExec::new(
2213            [PhysicalSortExpr {
2214                expr: col("a", &schema)?,
2215                options: SortOptions::default(),
2216            }]
2217            .into(),
2218            blocking_exec,
2219        ));
2220
2221        let fut = collect(sort_exec, Arc::clone(&task_ctx));
2222        let mut fut = fut.boxed();
2223
2224        assert_is_pending(&mut fut);
2225        drop(fut);
2226        assert_strong_count_converges_to_zero(refs).await;
2227
2228        assert_eq!(
2229            task_ctx.runtime_env().memory_pool.reserved(),
2230            0,
2231            "The sort should have returned all memory used back to the memory manager"
2232        );
2233
2234        Ok(())
2235    }
2236
2237    #[test]
2238    fn test_empty_sort_batch() {
2239        let schema = Arc::new(Schema::empty());
2240        let options = RecordBatchOptions::new().with_row_count(Some(1));
2241        let batch =
2242            RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2243                .unwrap();
2244
2245        let expressions = [PhysicalSortExpr {
2246            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2247            options: SortOptions::default(),
2248        }]
2249        .into();
2250
2251        let result = sort_batch(&batch, &expressions, None).unwrap();
2252        assert_eq!(result.num_rows(), 1);
2253    }
2254
2255    #[tokio::test]
2256    async fn topk_unbounded_source() -> Result<()> {
2257        let task_ctx = Arc::new(TaskContext::default());
2258        let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2259        let source = SortedUnboundedExec {
2260            schema: schema.clone(),
2261            batch_size: 2,
2262            cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
2263        };
2264        let mut plan = SortExec::new(
2265            [PhysicalSortExpr::new_default(Arc::new(Column::new(
2266                "c1", 0,
2267            )))]
2268            .into(),
2269            Arc::new(source),
2270        );
2271        plan = plan.with_fetch(Some(9));
2272
2273        let batches = collect(Arc::new(plan), task_ctx).await?;
2274        assert_snapshot!(batches_to_string(&batches), @r"
2275        +----+
2276        | c1 |
2277        +----+
2278        | 0  |
2279        | 1  |
2280        | 2  |
2281        | 3  |
2282        | 4  |
2283        | 5  |
2284        | 6  |
2285        | 7  |
2286        | 8  |
2287        +----+
2288        ");
2289        Ok(())
2290    }
2291
2292    #[tokio::test]
2293    async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2294        let batch_size = 100;
2295
2296        let create_task_ctx = |_: &[RecordBatch]| {
2297            TaskContext::default().with_session_config(
2298                SessionConfig::new()
2299                    .with_batch_size(batch_size)
2300                    .with_sort_in_place_threshold_bytes(usize::MAX),
2301            )
2302        };
2303
2304        // Smaller than batch size and require more than a single batch to get the requested batch size
2305        test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2306
2307        // Not evenly divisible by batch size
2308        test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2309
2310        // Evenly divisible by batch size and is larger than 2 output batches
2311        test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2312
2313        Ok(())
2314    }
2315
2316    #[tokio::test]
2317    async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place()
2318    -> Result<()> {
2319        let batch_size = 100;
2320
2321        let create_task_ctx = |_: &[RecordBatch]| {
2322            TaskContext::default().with_session_config(
2323                SessionConfig::new()
2324                    .with_batch_size(batch_size)
2325                    .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2326            )
2327        };
2328
2329        // Smaller than batch size and require more than a single batch to get the requested batch size
2330        {
2331            let metrics =
2332                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2333
2334            assert_eq!(
2335                metrics.spill_count(),
2336                Some(0),
2337                "Expected no spills when sorting in place"
2338            );
2339        }
2340
2341        // Not evenly divisible by batch size
2342        {
2343            let metrics =
2344                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2345
2346            assert_eq!(
2347                metrics.spill_count(),
2348                Some(0),
2349                "Expected no spills when sorting in place"
2350            );
2351        }
2352
2353        // Evenly divisible by batch size and is larger than 2 output batches
2354        {
2355            let metrics =
2356                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2357
2358            assert_eq!(
2359                metrics.spill_count(),
2360                Some(0),
2361                "Expected no spills when sorting in place"
2362            );
2363        }
2364
2365        Ok(())
2366    }
2367
2368    #[tokio::test]
2369    async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch()
2370    -> Result<()> {
2371        let batch_size = 100;
2372
2373        let create_task_ctx = |_: &[RecordBatch]| {
2374            TaskContext::default()
2375                .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2376        };
2377
2378        // Smaller than batch size and require more than a single batch to get the requested batch size
2379        {
2380            let metrics = test_sort_output_batch_size(
2381                // Single batch
2382                1,
2383                batch_size / 4,
2384                create_task_ctx,
2385            )
2386            .await?;
2387
2388            assert_eq!(
2389                metrics.spill_count(),
2390                Some(0),
2391                "Expected no spills when sorting in place"
2392            );
2393        }
2394
2395        // Not evenly divisible by batch size
2396        {
2397            let metrics = test_sort_output_batch_size(
2398                // Single batch
2399                1,
2400                batch_size + 7,
2401                create_task_ctx,
2402            )
2403            .await?;
2404
2405            assert_eq!(
2406                metrics.spill_count(),
2407                Some(0),
2408                "Expected no spills when sorting in place"
2409            );
2410        }
2411
2412        // Evenly divisible by batch size and is larger than 2 output batches
2413        {
2414            let metrics = test_sort_output_batch_size(
2415                // Single batch
2416                1,
2417                batch_size * 3,
2418                create_task_ctx,
2419            )
2420            .await?;
2421
2422            assert_eq!(
2423                metrics.spill_count(),
2424                Some(0),
2425                "Expected no spills when sorting in place"
2426            );
2427        }
2428
2429        Ok(())
2430    }
2431
2432    #[tokio::test]
2433    async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill()
2434    -> Result<()> {
2435        let batch_size = 100;
2436
2437        let create_task_ctx = |generated_batches: &[RecordBatch]| {
2438            let batches_memory = generated_batches
2439                .iter()
2440                .map(|b| b.get_array_memory_size())
2441                .sum::<usize>();
2442
2443            TaskContext::default()
2444                .with_session_config(
2445                    SessionConfig::new()
2446                        .with_batch_size(batch_size)
2447                        // To make sure there is no in place sorting
2448                        .with_sort_in_place_threshold_bytes(1)
2449                        .with_sort_spill_reservation_bytes(1),
2450                )
2451                .with_runtime(
2452                    RuntimeEnvBuilder::default()
2453                        .with_memory_limit(batches_memory, 1.0)
2454                        .build_arc()
2455                        .unwrap(),
2456                )
2457        };
2458
2459        // Smaller than batch size and require more than a single batch to get the requested batch size
2460        {
2461            let metrics =
2462                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2463
2464            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2465        }
2466
2467        // Not evenly divisible by batch size
2468        {
2469            let metrics =
2470                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2471
2472            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2473        }
2474
2475        // Evenly divisible by batch size and is larger than 2 batches
2476        {
2477            let metrics =
2478                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2479
2480            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2481        }
2482
2483        Ok(())
2484    }
2485
2486    async fn test_sort_output_batch_size(
2487        number_of_batches: usize,
2488        batch_size_to_generate: usize,
2489        create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2490    ) -> Result<MetricsSet> {
2491        let batches = (0..number_of_batches)
2492            .map(|_| make_partition(batch_size_to_generate as i32))
2493            .collect::<Vec<_>>();
2494        let task_ctx = create_task_ctx(batches.as_slice());
2495
2496        let expected_batch_size = task_ctx.session_config().batch_size();
2497
2498        let (mut output_batches, metrics) =
2499            run_sort_on_input(task_ctx, "i", batches).await?;
2500
2501        let last_batch = output_batches.pop().unwrap();
2502
2503        for batch in output_batches {
2504            assert_eq!(batch.num_rows(), expected_batch_size);
2505        }
2506
2507        let mut last_expected_batch_size =
2508            (batch_size_to_generate * number_of_batches) % expected_batch_size;
2509        if last_expected_batch_size == 0 {
2510            last_expected_batch_size = expected_batch_size;
2511        }
2512        assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2513
2514        Ok(metrics)
2515    }
2516
2517    async fn run_sort_on_input(
2518        task_ctx: TaskContext,
2519        order_by_col: &str,
2520        batches: Vec<RecordBatch>,
2521    ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2522        let task_ctx = Arc::new(task_ctx);
2523
2524        // let task_ctx = env.
2525        let schema = batches[0].schema();
2526        let ordering: LexOrdering = [PhysicalSortExpr {
2527            expr: col(order_by_col, &schema)?,
2528            options: SortOptions {
2529                descending: false,
2530                nulls_first: true,
2531            },
2532        }]
2533        .into();
2534        let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2535            ordering.clone(),
2536            TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2537        ));
2538
2539        let sorted_batches =
2540            collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2541
2542        let metrics = sort_exec.metrics().expect("sort have metrics");
2543
2544        // assert output
2545        {
2546            let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2547            let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2548
2549            let sorted_batches_concat =
2550                concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2551
2552            assert_eq!(sorted_input_batch, sorted_batches_concat);
2553        }
2554
2555        Ok((sorted_batches, metrics))
2556    }
2557
2558    #[tokio::test]
2559    async fn test_sort_batch_chunked_basic() -> Result<()> {
2560        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2561
2562        // Create a batch with 1000 rows
2563        let mut values: Vec<i32> = (0..1000).collect();
2564        // Shuffle to make it unsorted
2565        values.reverse();
2566
2567        let batch = RecordBatch::try_new(
2568            Arc::clone(&schema),
2569            vec![Arc::new(Int32Array::from(values))],
2570        )?;
2571
2572        let expressions: LexOrdering =
2573            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2574
2575        // Sort with batch_size = 250
2576        let result_batches = sort_batch_chunked(&batch, &expressions, 250)?;
2577
2578        // Verify 4 batches are returned
2579        assert_eq!(result_batches.len(), 4);
2580
2581        // Verify each batch has <= 250 rows
2582        let mut total_rows = 0;
2583        for (i, batch) in result_batches.iter().enumerate() {
2584            assert!(
2585                batch.num_rows() <= 250,
2586                "Batch {} has {} rows, expected <= 250",
2587                i,
2588                batch.num_rows()
2589            );
2590            total_rows += batch.num_rows();
2591        }
2592
2593        // Verify total row count matches input
2594        assert_eq!(total_rows, 1000);
2595
2596        // Verify data is correctly sorted across all chunks
2597        let concatenated = concat_batches(&schema, &result_batches)?;
2598        let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2599        for i in 0..array.len() - 1 {
2600            assert!(
2601                array.value(i) <= array.value(i + 1),
2602                "Array not sorted at position {}: {} > {}",
2603                i,
2604                array.value(i),
2605                array.value(i + 1)
2606            );
2607        }
2608        assert_eq!(array.value(0), 0);
2609        assert_eq!(array.value(array.len() - 1), 999);
2610
2611        Ok(())
2612    }
2613
2614    #[tokio::test]
2615    async fn test_sort_batch_chunked_smaller_than_batch_size() -> Result<()> {
2616        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2617
2618        // Create a batch with 50 rows
2619        let values: Vec<i32> = (0..50).rev().collect();
2620        let batch = RecordBatch::try_new(
2621            Arc::clone(&schema),
2622            vec![Arc::new(Int32Array::from(values))],
2623        )?;
2624
2625        let expressions: LexOrdering =
2626            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2627
2628        // Sort with batch_size = 100
2629        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2630
2631        // Should return exactly 1 batch
2632        assert_eq!(result_batches.len(), 1);
2633        assert_eq!(result_batches[0].num_rows(), 50);
2634
2635        // Verify it's correctly sorted
2636        let array = as_primitive_array::<Int32Type>(result_batches[0].column(0))?;
2637        for i in 0..array.len() - 1 {
2638            assert!(array.value(i) <= array.value(i + 1));
2639        }
2640        assert_eq!(array.value(0), 0);
2641        assert_eq!(array.value(49), 49);
2642
2643        Ok(())
2644    }
2645
2646    #[tokio::test]
2647    async fn test_sort_batch_chunked_exact_multiple() -> Result<()> {
2648        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2649
2650        // Create a batch with 1000 rows
2651        let values: Vec<i32> = (0..1000).rev().collect();
2652        let batch = RecordBatch::try_new(
2653            Arc::clone(&schema),
2654            vec![Arc::new(Int32Array::from(values))],
2655        )?;
2656
2657        let expressions: LexOrdering =
2658            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2659
2660        // Sort with batch_size = 100
2661        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2662
2663        // Should return exactly 10 batches of 100 rows each
2664        assert_eq!(result_batches.len(), 10);
2665        for batch in &result_batches {
2666            assert_eq!(batch.num_rows(), 100);
2667        }
2668
2669        // Verify sorted correctly across all batches
2670        let concatenated = concat_batches(&schema, &result_batches)?;
2671        let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2672        for i in 0..array.len() - 1 {
2673            assert!(array.value(i) <= array.value(i + 1));
2674        }
2675
2676        Ok(())
2677    }
2678
2679    #[tokio::test]
2680    async fn test_sort_batch_chunked_empty_batch() -> Result<()> {
2681        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2682
2683        let batch = RecordBatch::new_empty(Arc::clone(&schema));
2684
2685        let expressions: LexOrdering =
2686            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2687
2688        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2689
2690        // Empty input produces no output batches (0 chunks)
2691        assert_eq!(result_batches.len(), 0);
2692
2693        Ok(())
2694    }
2695
2696    #[tokio::test]
2697    async fn test_get_reserved_bytes_for_record_batch_with_sliced_batches() -> Result<()>
2698    {
2699        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2700
2701        // Create a larger batch then slice it
2702        let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
2703        let sliced_array = large_array.slice(100, 50); // Take 50 elements starting at 100
2704
2705        let sliced_batch =
2706            RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(sliced_array)])?;
2707        let batch =
2708            RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(large_array)])?;
2709
2710        let sliced_reserved = get_reserved_bytes_for_record_batch(&sliced_batch)?;
2711        let reserved = get_reserved_bytes_for_record_batch(&batch)?;
2712
2713        // The reserved memory for the sliced batch should be less than that of the full batch
2714        assert!(reserved > sliced_reserved);
2715
2716        Ok(())
2717    }
2718}