Skip to main content

datafusion_physical_plan/sorts/
sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort that deals with an arbitrary size of the input.
19//! It will do in-memory sorting if it has enough memory budget
20//! but spills to disk if needed.
21
22use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
31use crate::expressions::PhysicalSortExpr;
32use crate::filter_pushdown::{
33    ChildFilterDescription, FilterDescription, FilterPushdownPhase,
34};
35use crate::limit::LimitStream;
36use crate::metrics::{
37    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
38};
39use crate::projection::{ProjectionExec, make_with_child, update_ordering};
40use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
41use crate::spill::get_record_batch_memory_size;
42use crate::spill::in_progress_spill_file::InProgressSpillFile;
43use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
44use crate::stream::RecordBatchStreamAdapter;
45use crate::stream::ReservationStream;
46use crate::topk::TopK;
47use crate::topk::TopKDynamicFilters;
48use crate::{
49    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
50    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
51    Statistics,
52};
53
54use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
55use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
56use arrow::datatypes::SchemaRef;
57use datafusion_common::config::SpillCompression;
58use datafusion_common::{
59    DataFusionError, Result, assert_or_internal_err, internal_datafusion_err,
60    unwrap_or_internal_err,
61};
62use datafusion_execution::TaskContext;
63use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
64use datafusion_execution::runtime_env::RuntimeEnv;
65use datafusion_physical_expr::LexOrdering;
66use datafusion_physical_expr::PhysicalExpr;
67use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
68
69use futures::{StreamExt, TryStreamExt};
70use log::{debug, trace};
71
72struct ExternalSorterMetrics {
73    /// metrics
74    baseline: BaselineMetrics,
75
76    spill_metrics: SpillMetrics,
77}
78
79impl ExternalSorterMetrics {
80    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
81        Self {
82            baseline: BaselineMetrics::new(metrics, partition),
83            spill_metrics: SpillMetrics::new(metrics, partition),
84        }
85    }
86}
87
88/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
89/// a total order. Depending on the input size and memory manager
90/// configuration, writes intermediate results to disk ("spills")
91/// using Arrow IPC format.
92///
93/// # Algorithm
94///
95/// 1. get a non-empty new batch from input
96///
97/// 2. check with the memory manager there is sufficient space to
98///    buffer the batch in memory.
99///
100/// 2.1 if memory is sufficient, buffer batch in memory, go to 1.
101///
102/// 2.2 if no more memory is available, sort all buffered batches and
103///     spill to file.  buffer the next batch in memory, go to 1.
104///
105/// 3. when input is exhausted, merge all in memory batches and spills
106///    to get a total order.
107///
108/// # When data fits in available memory
109///
110/// If there is sufficient memory, data is sorted in memory to produce the output
111///
112/// ```text
113///    ┌─────┐
114///    │  2  │
115///    │  3  │
116///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
117///    │  4  │
118///    │  2  │                  │
119///    └─────┘                  ▼
120///    ┌─────┐
121///    │  1  │              In memory
122///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
123///    │  1  │
124///    └─────┘                  ▲
125///      ...                    │
126///
127///    ┌─────┐                  │
128///    │  4  │
129///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
130///    └─────┘
131///
132/// in_mem_batches
133/// ```
134///
135/// # When data does not fit in available memory
136///
137///  When memory is exhausted, data is first sorted and written to one
138///  or more spill files on disk:
139///
140/// ```text
141///    ┌─────┐                               .─────────────────.
142///    │  2  │                              (                   )
143///    │  3  │                              │`─────────────────'│
144///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
145///    │  4  │             │                │  │ 1  │░          │
146///    │  2  │                              │  │... │░          │
147///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
148///    ┌─────┐                              │  └────┘░    1  │░ │
149///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
150///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
151///    │  1  │     and write to file        │           │    ░░ │
152///    └─────┘                              │             4  │░ │
153///      ...               ▲                │           └░─░─░░ │
154///                        │                │            ░░░░░░ │
155///    ┌─────┐                              │.─────────────────.│
156///    │  4  │             │                (                   )
157///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
158///    └─────┘
159///
160/// in_mem_batches                                  spills
161///                                         (file on disk in Arrow
162///                                               IPC format)
163/// ```
164///
165/// Once the input is completely read, the spill files are read and
166/// merged with any in memory batches to produce a single total sorted
167/// output:
168///
169/// ```text
170///   .─────────────────.
171///  (                   )
172///  │`─────────────────'│
173///  │  ┌────┐           │
174///  │  │ 1  │░          │
175///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
176///  │  │ 4  │░ ┌────┐   │           │
177///  │  └────┘░ │ 1  │░  │           ▼
178///  │   ░░░░░░ │    │░  │
179///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
180///  │          │    │░  │
181///  │          │ 4  │░  │           ▲
182///  │          └────┘░  │           │
183///  │           ░░░░░░  │
184///  │.─────────────────.│           │
185///  (                   )
186///   `─────────────────'            │
187///         spills
188///                                  │
189///
190///                                  │
191///
192///     ┌─────┐                      │
193///     │  1  │
194///     │  4  │─ ─ ─ ─               │
195///     └─────┘       │
196///       ...                   In memory
197///                   └ ─ ─ ─▶  sort/merge
198///     ┌─────┐
199///     │  4  │                      ▲
200///     │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
201///     └─────┘
202///
203///  in_mem_batches
204/// ```
205struct ExternalSorter {
206    // ========================================================================
207    // PROPERTIES:
208    // Fields that define the sorter's configuration and remain constant
209    // ========================================================================
210    /// Schema of the output (and the input)
211    schema: SchemaRef,
212    /// Sort expressions
213    expr: LexOrdering,
214    /// The target number of rows for output batches
215    batch_size: usize,
216    /// If the in size of buffered memory batches is below this size,
217    /// the data will be concatenated and sorted in place rather than
218    /// sort/merged.
219    sort_in_place_threshold_bytes: usize,
220
221    // ========================================================================
222    // STATE BUFFERS:
223    // Fields that hold intermediate data during sorting
224    // ========================================================================
225    /// Unsorted input batches stored in the memory buffer
226    in_mem_batches: Vec<RecordBatch>,
227
228    /// During external sorting, in-memory intermediate data will be appended to
229    /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
230    ///
231    /// this is a tuple of:
232    /// 1. `InProgressSpillFile` - the file that is being written to
233    /// 2. `max_record_batch_memory` - the maximum memory usage of a single batch in this spill file.
234    in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
235    /// If data has previously been spilled, the locations of the spill files (in
236    /// Arrow IPC format)
237    /// Within the same spill file, the data might be chunked into multiple batches,
238    /// and ordered by sort keys.
239    finished_spill_files: Vec<SortedSpillFile>,
240
241    // ========================================================================
242    // EXECUTION RESOURCES:
243    // Fields related to managing execution resources and monitoring performance.
244    // ========================================================================
245    /// Runtime metrics
246    metrics: ExternalSorterMetrics,
247    /// A handle to the runtime to get spill files
248    runtime: Arc<RuntimeEnv>,
249    /// Reservation for in_mem_batches
250    reservation: MemoryReservation,
251    spill_manager: SpillManager,
252
253    /// Reservation for the merging of in-memory batches. If the sort
254    /// might spill, `sort_spill_reservation_bytes` will be
255    /// pre-reserved to ensure there is some space for this sort/merge.
256    merge_reservation: MemoryReservation,
257    /// How much memory to reserve for performing in-memory sort/merges
258    /// prior to spilling.
259    sort_spill_reservation_bytes: usize,
260}
261
262impl ExternalSorter {
263    // TODO: make a builder or some other nicer API to avoid the
264    // clippy warning
265    #[expect(clippy::too_many_arguments)]
266    pub fn new(
267        partition_id: usize,
268        schema: SchemaRef,
269        expr: LexOrdering,
270        batch_size: usize,
271        sort_spill_reservation_bytes: usize,
272        sort_in_place_threshold_bytes: usize,
273        // Configured via `datafusion.execution.spill_compression`.
274        spill_compression: SpillCompression,
275        metrics: &ExecutionPlanMetricsSet,
276        runtime: Arc<RuntimeEnv>,
277    ) -> Result<Self> {
278        let metrics = ExternalSorterMetrics::new(metrics, partition_id);
279        let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
280            .with_can_spill(true)
281            .register(&runtime.memory_pool);
282
283        let merge_reservation =
284            MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
285                .register(&runtime.memory_pool);
286
287        let spill_manager = SpillManager::new(
288            Arc::clone(&runtime),
289            metrics.spill_metrics.clone(),
290            Arc::clone(&schema),
291        )
292        .with_compression_type(spill_compression);
293
294        Ok(Self {
295            schema,
296            in_mem_batches: vec![],
297            in_progress_spill_file: None,
298            finished_spill_files: vec![],
299            expr,
300            metrics,
301            reservation,
302            spill_manager,
303            merge_reservation,
304            runtime,
305            batch_size,
306            sort_spill_reservation_bytes,
307            sort_in_place_threshold_bytes,
308        })
309    }
310
311    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
312    ///
313    /// Updates memory usage metrics, and possibly triggers spilling to disk
314    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
315        if input.num_rows() == 0 {
316            return Ok(());
317        }
318
319        self.reserve_memory_for_merge()?;
320        self.reserve_memory_for_batch_and_maybe_spill(&input)
321            .await?;
322
323        self.in_mem_batches.push(input);
324        Ok(())
325    }
326
327    fn spilled_before(&self) -> bool {
328        !self.finished_spill_files.is_empty()
329    }
330
331    /// Returns the final sorted output of all batches inserted via
332    /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
333    ///
334    /// This process could either be:
335    ///
336    /// 1. An in-memory sort/merge (if the input fit in memory)
337    ///
338    /// 2. A combined streaming merge incorporating both in-memory
339    ///    batches and data from spill files on disk.
340    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
341        // Release the memory reserved for merge back to the pool so
342        // there is some left when `in_mem_sort_stream` requests an
343        // allocation.
344        self.merge_reservation.free();
345
346        if self.spilled_before() {
347            // Sort `in_mem_batches` and spill it first. If there are many
348            // `in_mem_batches` and the memory limit is almost reached, merging
349            // them with the spilled files at the same time might cause OOM.
350            if !self.in_mem_batches.is_empty() {
351                self.sort_and_spill_in_mem_batches().await?;
352            }
353
354            StreamingMergeBuilder::new()
355                .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
356                .with_spill_manager(self.spill_manager.clone())
357                .with_schema(Arc::clone(&self.schema))
358                .with_expressions(&self.expr.clone())
359                .with_metrics(self.metrics.baseline.clone())
360                .with_batch_size(self.batch_size)
361                .with_fetch(None)
362                .with_reservation(self.merge_reservation.new_empty())
363                .build()
364        } else {
365            self.in_mem_sort_stream(self.metrics.baseline.clone())
366        }
367    }
368
369    /// How much memory is buffered in this `ExternalSorter`?
370    fn used(&self) -> usize {
371        self.reservation.size()
372    }
373
374    /// How many bytes have been spilled to disk?
375    fn spilled_bytes(&self) -> usize {
376        self.metrics.spill_metrics.spilled_bytes.value()
377    }
378
379    /// How many rows have been spilled to disk?
380    fn spilled_rows(&self) -> usize {
381        self.metrics.spill_metrics.spilled_rows.value()
382    }
383
384    /// How many spill files have been created?
385    fn spill_count(&self) -> usize {
386        self.metrics.spill_metrics.spill_file_count.value()
387    }
388
389    /// Appending globally sorted batches to the in-progress spill file, and clears
390    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
391    async fn consume_and_spill_append(
392        &mut self,
393        globally_sorted_batches: &mut Vec<RecordBatch>,
394    ) -> Result<()> {
395        if globally_sorted_batches.is_empty() {
396            return Ok(());
397        }
398
399        // Lazily initialize the in-progress spill file
400        if self.in_progress_spill_file.is_none() {
401            self.in_progress_spill_file =
402                Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
403        }
404
405        Self::organize_stringview_arrays(globally_sorted_batches)?;
406
407        debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
408
409        let batches_to_spill = std::mem::take(globally_sorted_batches);
410        self.reservation.free();
411
412        let (in_progress_file, max_record_batch_size) =
413            self.in_progress_spill_file.as_mut().ok_or_else(|| {
414                internal_datafusion_err!("In-progress spill file should be initialized")
415            })?;
416
417        for batch in batches_to_spill {
418            in_progress_file.append_batch(&batch)?;
419
420            *max_record_batch_size =
421                (*max_record_batch_size).max(batch.get_sliced_size()?);
422        }
423
424        assert_or_internal_err!(
425            globally_sorted_batches.is_empty(),
426            "This function consumes globally_sorted_batches, so it should be empty after taking."
427        );
428
429        Ok(())
430    }
431
432    /// Finishes the in-progress spill file and moves it to the finished spill files.
433    async fn spill_finish(&mut self) -> Result<()> {
434        let (mut in_progress_file, max_record_batch_memory) =
435            self.in_progress_spill_file.take().ok_or_else(|| {
436                internal_datafusion_err!("Should be called after `spill_append`")
437            })?;
438        let spill_file = in_progress_file.finish()?;
439
440        if let Some(spill_file) = spill_file {
441            self.finished_spill_files.push(SortedSpillFile {
442                file: spill_file,
443                max_record_batch_memory,
444            });
445        }
446
447        Ok(())
448    }
449
450    /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
451    /// `StringViewArray` in sequential order by calling `gc()` on them.
452    ///
453    /// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
454    /// available
455    ///
456    /// # Rationale
457    /// After (merge-based) sorting, all batches will be sorted into a single run,
458    /// but physically this sorted run is chunked into many small batches. For
459    /// `StringViewArray`s inside each sorted run, their inner buffers are not
460    /// re-constructed by default, leading to non-sequential payload locations
461    /// (permutated by `interleave()` Arrow kernel). A single payload buffer might
462    /// be shared by multiple `RecordBatch`es.
463    /// When writing each batch to disk, the writer has to write all referenced buffers,
464    /// because they have to be read back one by one to reduce memory usage. This
465    /// causes extra disk reads and writes, and potentially execution failure.
466    ///
467    /// # Example
468    /// Before sorting:
469    /// batch1 -> buffer1
470    /// batch2 -> buffer2
471    ///
472    /// sorted_batch1 -> buffer1
473    ///               -> buffer2
474    /// sorted_batch2 -> buffer1
475    ///               -> buffer2
476    ///
477    /// Then when spilling each batch, the writer has to write all referenced buffers
478    /// repeatedly.
479    fn organize_stringview_arrays(
480        globally_sorted_batches: &mut Vec<RecordBatch>,
481    ) -> Result<()> {
482        let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
483
484        for batch in globally_sorted_batches.drain(..) {
485            let mut new_columns: Vec<Arc<dyn Array>> =
486                Vec::with_capacity(batch.num_columns());
487
488            let mut arr_mutated = false;
489            for array in batch.columns() {
490                if let Some(string_view_array) =
491                    array.as_any().downcast_ref::<StringViewArray>()
492                {
493                    let new_array = string_view_array.gc();
494                    new_columns.push(Arc::new(new_array));
495                    arr_mutated = true;
496                } else {
497                    new_columns.push(Arc::clone(array));
498                }
499            }
500
501            let organized_batch = if arr_mutated {
502                RecordBatch::try_new(batch.schema(), new_columns)?
503            } else {
504                batch
505            };
506
507            organized_batches.push(organized_batch);
508        }
509
510        *globally_sorted_batches = organized_batches;
511
512        Ok(())
513    }
514
515    /// Sorts the in-memory batches and merges them into a single sorted run, then writes
516    /// the result to spill files.
517    async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
518        assert_or_internal_err!(
519            !self.in_mem_batches.is_empty(),
520            "in_mem_batches must not be empty when attempting to sort and spill"
521        );
522
523        // Release the memory reserved for merge back to the pool so
524        // there is some left when `in_mem_sort_stream` requests an
525        // allocation. At the end of this function, memory will be
526        // reserved again for the next spill.
527        self.merge_reservation.free();
528
529        let mut sorted_stream =
530            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
531        // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
532        // to construct a globally sorted stream.
533        assert_or_internal_err!(
534            self.in_mem_batches.is_empty(),
535            "in_mem_batches should be empty after constructing sorted stream"
536        );
537        // 'global' here refers to all buffered batches when the memory limit is
538        // reached. This variable will buffer the sorted batches after
539        // sort-preserving merge and incrementally append to spill files.
540        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
541
542        while let Some(batch) = sorted_stream.next().await {
543            let batch = batch?;
544            let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
545            if self.reservation.try_grow(sorted_size).is_err() {
546                // Although the reservation is not enough, the batch is
547                // already in memory, so it's okay to combine it with previously
548                // sorted batches, and spill together.
549                globally_sorted_batches.push(batch);
550                self.consume_and_spill_append(&mut globally_sorted_batches)
551                    .await?; // reservation is freed in spill()
552            } else {
553                globally_sorted_batches.push(batch);
554            }
555        }
556
557        // Drop early to free up memory reserved by the sorted stream, otherwise the
558        // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
559        drop(sorted_stream);
560
561        self.consume_and_spill_append(&mut globally_sorted_batches)
562            .await?;
563        self.spill_finish().await?;
564
565        // Sanity check after spilling
566        let buffers_cleared_property =
567            self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
568        assert_or_internal_err!(
569            buffers_cleared_property,
570            "in_mem_batches and globally_sorted_batches should be cleared before"
571        );
572
573        // Reserve headroom for next sort/merge
574        self.reserve_memory_for_merge()?;
575
576        Ok(())
577    }
578
579    /// Consumes in_mem_batches returning a sorted stream of
580    /// batches. This proceeds in one of two ways:
581    ///
582    /// # Small Datasets
583    ///
584    /// For "smaller" datasets, the data is first concatenated into a
585    /// single batch and then sorted. This is often faster than
586    /// sorting and then merging.
587    ///
588    /// ```text
589    ///        ┌─────┐
590    ///        │  2  │
591    ///        │  3  │
592    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
593    ///        │  4  │                     │  2  │
594    ///        │  2  │        │            │  3  │
595    ///        └─────┘                     │  1  │             sorted output
596    ///        ┌─────┐        ▼            │  4  │                stream
597    ///        │  1  │                     │  2  │
598    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
599    ///        │  1  │                     │  4  │
600    ///        └─────┘        ▲            │  1  │
601    ///          ...          │            │ ... │
602    ///                                    │  4  │
603    ///        ┌─────┐        │            │  3  │
604    ///        │  4  │                     └─────┘
605    ///        │  3  │─ ─ ─ ─ ┘
606    ///        └─────┘
607    ///     in_mem_batches
608    /// ```
609    ///
610    /// # Larger datasets
611    ///
612    /// For larger datasets, the batches are first sorted individually
613    /// and then merged together.
614    ///
615    /// ```text
616    ///      ┌─────┐                ┌─────┐
617    ///      │  2  │                │  1  │
618    ///      │  3  │                │  2  │
619    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
620    ///      │  4  │                │  3  │
621    ///      │  2  │                │  4  │          │
622    ///      └─────┘                └─────┘               sorted output
623    ///      ┌─────┐                ┌─────┐          ▼       stream
624    ///      │  1  │                │  1  │
625    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
626    ///      │  1  │                │  4  │
627    ///      └─────┘                └─────┘          ▲
628    ///        ...       ...         ...             │
629    ///
630    ///      ┌─────┐                ┌─────┐          │
631    ///      │  4  │                │  3  │
632    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
633    ///      └─────┘                └─────┘
634    ///
635    ///   in_mem_batches
636    /// ```
637    fn in_mem_sort_stream(
638        &mut self,
639        metrics: BaselineMetrics,
640    ) -> Result<SendableRecordBatchStream> {
641        if self.in_mem_batches.is_empty() {
642            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
643                &self.schema,
644            ))));
645        }
646
647        // The elapsed compute timer is updated when the value is dropped.
648        // There is no need for an explicit call to drop.
649        let elapsed_compute = metrics.elapsed_compute().clone();
650        let _timer = elapsed_compute.timer();
651
652        // Please pay attention that any operation inside of `in_mem_sort_stream` will
653        // not perform any memory reservation. This is for avoiding the need of handling
654        // reservation failure and spilling in the middle of the sort/merge. The memory
655        // space for batches produced by the resulting stream will be reserved by the
656        // consumer of the stream.
657
658        if self.in_mem_batches.len() == 1 {
659            let batch = self.in_mem_batches.swap_remove(0);
660            let reservation = self.reservation.take();
661            return self.sort_batch_stream(batch, &metrics, reservation);
662        }
663
664        // If less than sort_in_place_threshold_bytes, concatenate and sort in place
665        if self.reservation.size() < self.sort_in_place_threshold_bytes {
666            // Concatenate memory batches together and sort
667            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
668            self.in_mem_batches.clear();
669            self.reservation
670                .try_resize(get_reserved_bytes_for_record_batch(&batch)?)
671                .map_err(Self::err_with_oom_context)?;
672            let reservation = self.reservation.take();
673            return self.sort_batch_stream(batch, &metrics, reservation);
674        }
675
676        let streams = std::mem::take(&mut self.in_mem_batches)
677            .into_iter()
678            .map(|batch| {
679                let metrics = self.metrics.baseline.intermediate();
680                let reservation = self
681                    .reservation
682                    .split(get_reserved_bytes_for_record_batch(&batch)?);
683                let input = self.sort_batch_stream(batch, &metrics, reservation)?;
684                Ok(spawn_buffered(input, 1))
685            })
686            .collect::<Result<_>>()?;
687
688        StreamingMergeBuilder::new()
689            .with_streams(streams)
690            .with_schema(Arc::clone(&self.schema))
691            .with_expressions(&self.expr.clone())
692            .with_metrics(metrics)
693            .with_batch_size(self.batch_size)
694            .with_fetch(None)
695            .with_reservation(self.merge_reservation.new_empty())
696            .build()
697    }
698
699    /// Sorts a single `RecordBatch` into a single stream.
700    ///
701    /// This may output multiple batches depending on the size of the
702    /// sorted data and the target batch size.
703    /// For single-batch output cases, `reservation` will be freed immediately after sorting,
704    /// as the batch will be output and is expected to be reserved by the consumer of the stream.
705    /// For multi-batch output cases, `reservation` will be grown to match the actual
706    /// size of sorted output, and as each batch is output, its memory will be freed from the reservation.
707    /// (This leads to the same behaviour, as futures are only evaluated when polled by the consumer.)
708    fn sort_batch_stream(
709        &self,
710        batch: RecordBatch,
711        metrics: &BaselineMetrics,
712        mut reservation: MemoryReservation,
713    ) -> Result<SendableRecordBatchStream> {
714        assert_eq!(
715            get_reserved_bytes_for_record_batch(&batch)?,
716            reservation.size()
717        );
718
719        let schema = batch.schema();
720        let expressions = self.expr.clone();
721        let batch_size = self.batch_size;
722        let output_row_metrics = metrics.output_rows().clone();
723
724        let stream = futures::stream::once(async move {
725            let schema = batch.schema();
726
727            // Sort the batch immediately and get all output batches
728            let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
729            drop(batch);
730
731            // Resize the reservation to match the actual sorted output size.
732            // Using try_resize avoids a release-then-reacquire cycle, which
733            // matters for MemoryPool implementations where grow/shrink have
734            // non-trivial cost (e.g. JNI calls in Comet).
735            let total_sorted_size: usize = sorted_batches
736                .iter()
737                .map(get_record_batch_memory_size)
738                .sum();
739            reservation
740                .try_resize(total_sorted_size)
741                .map_err(Self::err_with_oom_context)?;
742
743            // Wrap in ReservationStream to hold the reservation
744            Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
745                Arc::clone(&schema),
746                Box::pin(RecordBatchStreamAdapter::new(
747                    schema,
748                    futures::stream::iter(sorted_batches.into_iter().map(Ok)),
749                )),
750                reservation,
751            )) as SendableRecordBatchStream)
752        })
753        .try_flatten()
754        .map(move |batch| match batch {
755            Ok(batch) => {
756                output_row_metrics.add(batch.num_rows());
757                Ok(batch)
758            }
759            Err(e) => Err(e),
760        });
761
762        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
763    }
764
765    /// If this sort may spill, pre-allocates
766    /// `sort_spill_reservation_bytes` of memory to guarantee memory
767    /// left for the in memory sort/merge.
768    fn reserve_memory_for_merge(&mut self) -> Result<()> {
769        // Reserve headroom for next merge sort
770        if self.runtime.disk_manager.tmp_files_enabled() {
771            let size = self.sort_spill_reservation_bytes;
772            if self.merge_reservation.size() != size {
773                self.merge_reservation
774                    .try_resize(size)
775                    .map_err(Self::err_with_oom_context)?;
776            }
777        }
778
779        Ok(())
780    }
781
782    /// Reserves memory to be able to accommodate the given batch.
783    /// If memory is scarce, tries to spill current in-memory batches to disk first.
784    async fn reserve_memory_for_batch_and_maybe_spill(
785        &mut self,
786        input: &RecordBatch,
787    ) -> Result<()> {
788        let size = get_reserved_bytes_for_record_batch(input)?;
789
790        match self.reservation.try_grow(size) {
791            Ok(_) => Ok(()),
792            Err(e) => {
793                if self.in_mem_batches.is_empty() {
794                    return Err(Self::err_with_oom_context(e));
795                }
796
797                // Spill and try again.
798                self.sort_and_spill_in_mem_batches().await?;
799                self.reservation
800                    .try_grow(size)
801                    .map_err(Self::err_with_oom_context)
802            }
803        }
804    }
805
806    /// Wraps the error with a context message suggesting settings to tweak.
807    /// This is meant to be used with DataFusionError::ResourcesExhausted only.
808    fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
809        match e {
810            DataFusionError::ResourcesExhausted(_) => e.context(
811                "Not enough memory to continue external sort. \
812                    Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
813            ),
814            // This is not an OOM error, so just return it as is.
815            _ => e,
816        }
817    }
818}
819
820/// Estimate how much memory is needed to sort a `RecordBatch`.
821///
822/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
823/// creating sorted copies of sorted columns in record batches for speeding up comparison
824/// in sorting and merging. The sorted copies are in either row format or array format.
825/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
826/// sorted copies are, they will use more memory than the original record batch.
827///
828/// This can basically be calculated as the sum of the actual space it takes in
829/// memory (which would be larger for a sliced batch), and the size of the actual data.
830pub(crate) fn get_reserved_bytes_for_record_batch_size(
831    record_batch_size: usize,
832    sliced_size: usize,
833) -> usize {
834    // Even 2x may not be enough for some cases, but it's a good enough estimation as a baseline.
835    // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
836    // to compensate for the extra memory needed.
837    record_batch_size + sliced_size
838}
839
840/// Estimate how much memory is needed to sort a `RecordBatch`.
841/// This will just call `get_reserved_bytes_for_record_batch_size` with the
842/// memory size of the record batch and its sliced size.
843pub(super) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result<usize> {
844    Ok(get_reserved_bytes_for_record_batch_size(
845        get_record_batch_memory_size(batch),
846        batch.get_sliced_size()?,
847    ))
848}
849
850impl Debug for ExternalSorter {
851    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
852        f.debug_struct("ExternalSorter")
853            .field("memory_used", &self.used())
854            .field("spilled_bytes", &self.spilled_bytes())
855            .field("spilled_rows", &self.spilled_rows())
856            .field("spill_count", &self.spill_count())
857            .finish()
858    }
859}
860
861pub fn sort_batch(
862    batch: &RecordBatch,
863    expressions: &LexOrdering,
864    fetch: Option<usize>,
865) -> Result<RecordBatch> {
866    let sort_columns = expressions
867        .iter()
868        .map(|expr| expr.evaluate_to_sort_column(batch))
869        .collect::<Result<Vec<_>>>()?;
870
871    let indices = lexsort_to_indices(&sort_columns, fetch)?;
872    let columns = take_arrays(batch.columns(), &indices, None)?;
873
874    let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
875    Ok(RecordBatch::try_new_with_options(
876        batch.schema(),
877        columns,
878        &options,
879    )?)
880}
881
882/// Sort a batch and return the result as multiple batches of size `batch_size`.
883/// This is useful when you want to avoid creating one large sorted batch in memory,
884/// and instead want to process the sorted data in smaller chunks.
885pub fn sort_batch_chunked(
886    batch: &RecordBatch,
887    expressions: &LexOrdering,
888    batch_size: usize,
889) -> Result<Vec<RecordBatch>> {
890    let sort_columns = expressions
891        .iter()
892        .map(|expr| expr.evaluate_to_sort_column(batch))
893        .collect::<Result<Vec<_>>>()?;
894
895    let indices = lexsort_to_indices(&sort_columns, None)?;
896
897    // Split indices into chunks of batch_size
898    let num_rows = indices.len();
899    let num_chunks = num_rows.div_ceil(batch_size);
900
901    let result_batches = (0..num_chunks)
902        .map(|chunk_idx| {
903            let start = chunk_idx * batch_size;
904            let end = (start + batch_size).min(num_rows);
905            let chunk_len = end - start;
906
907            // Create a slice of indices for this chunk
908            let chunk_indices = indices.slice(start, chunk_len);
909
910            // Take the columns using this chunk of indices
911            let columns = take_arrays(batch.columns(), &chunk_indices, None)?;
912
913            let options = RecordBatchOptions::new().with_row_count(Some(chunk_len));
914            let chunk_batch =
915                RecordBatch::try_new_with_options(batch.schema(), columns, &options)?;
916
917            Ok(chunk_batch)
918        })
919        .collect::<Result<Vec<RecordBatch>>>()?;
920
921    Ok(result_batches)
922}
923
924/// Sort execution plan.
925///
926/// Support sorting datasets that are larger than the memory allotted
927/// by the memory manager, by spilling to disk.
928#[derive(Debug, Clone)]
929pub struct SortExec {
930    /// Input schema
931    pub(crate) input: Arc<dyn ExecutionPlan>,
932    /// Sort expressions
933    expr: LexOrdering,
934    /// Containing all metrics set created during sort
935    metrics_set: ExecutionPlanMetricsSet,
936    /// Preserve partitions of input plan. If false, the input partitions
937    /// will be sorted and merged into a single output partition.
938    preserve_partitioning: bool,
939    /// Fetch highest/lowest n results
940    fetch: Option<usize>,
941    /// Normalized common sort prefix between the input and the sort expressions (only used with fetch)
942    common_sort_prefix: Vec<PhysicalSortExpr>,
943    /// Cache holding plan properties like equivalences, output partitioning etc.
944    cache: PlanProperties,
945    /// Filter matching the state of the sort for dynamic filter pushdown.
946    /// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
947    /// If `fetch` is `None`, this will be `None`.
948    filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
949}
950
951impl SortExec {
952    /// Create a new sort execution plan that produces a single,
953    /// sorted output partition.
954    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
955        let preserve_partitioning = false;
956        let (cache, sort_prefix) =
957            Self::compute_properties(&input, expr.clone(), preserve_partitioning)
958                .unwrap();
959        Self {
960            expr,
961            input,
962            metrics_set: ExecutionPlanMetricsSet::new(),
963            preserve_partitioning,
964            fetch: None,
965            common_sort_prefix: sort_prefix,
966            cache,
967            filter: None,
968        }
969    }
970
971    /// Whether this `SortExec` preserves partitioning of the children
972    pub fn preserve_partitioning(&self) -> bool {
973        self.preserve_partitioning
974    }
975
976    /// Specify the partitioning behavior of this sort exec
977    ///
978    /// If `preserve_partitioning` is true, sorts each partition
979    /// individually, producing one sorted stream for each input partition.
980    ///
981    /// If `preserve_partitioning` is false, sorts and merges all
982    /// input partitions producing a single, sorted partition.
983    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
984        self.preserve_partitioning = preserve_partitioning;
985        self.cache = self
986            .cache
987            .with_partitioning(Self::output_partitioning_helper(
988                &self.input,
989                self.preserve_partitioning,
990            ));
991        self
992    }
993
994    /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
995    fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
996        let children = self
997            .expr
998            .iter()
999            .map(|sort_expr| Arc::clone(&sort_expr.expr))
1000            .collect::<Vec<_>>();
1001        Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1002            DynamicFilterPhysicalExpr::new(children, lit(true)),
1003        ))))
1004    }
1005
1006    fn cloned(&self) -> Self {
1007        SortExec {
1008            input: Arc::clone(&self.input),
1009            expr: self.expr.clone(),
1010            metrics_set: self.metrics_set.clone(),
1011            preserve_partitioning: self.preserve_partitioning,
1012            common_sort_prefix: self.common_sort_prefix.clone(),
1013            fetch: self.fetch,
1014            cache: self.cache.clone(),
1015            filter: self.filter.clone(),
1016        }
1017    }
1018
1019    /// Modify how many rows to include in the result
1020    ///
1021    /// If None, then all rows will be returned, in sorted order.
1022    /// If Some, then only the top `fetch` rows will be returned.
1023    /// This can reduce the memory pressure required by the sort
1024    /// operation since rows that are not going to be included
1025    /// can be dropped.
1026    pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
1027        let mut cache = self.cache.clone();
1028        // If the SortExec can emit incrementally (that means the sort requirements
1029        // and properties of the input match), the SortExec can generate its result
1030        // without scanning the entire input when a fetch value exists.
1031        let is_pipeline_friendly = matches!(
1032            self.cache.emission_type,
1033            EmissionType::Incremental | EmissionType::Both
1034        );
1035        if fetch.is_some() && is_pipeline_friendly {
1036            cache = cache.with_boundedness(Boundedness::Bounded);
1037        }
1038        let filter = fetch.is_some().then(|| {
1039            // If we already have a filter, keep it. Otherwise, create a new one.
1040            self.filter.clone().unwrap_or_else(|| self.create_filter())
1041        });
1042        let mut new_sort = self.cloned();
1043        new_sort.fetch = fetch;
1044        new_sort.cache = cache;
1045        new_sort.filter = filter;
1046        new_sort
1047    }
1048
1049    /// Input schema
1050    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1051        &self.input
1052    }
1053
1054    /// Sort expressions
1055    pub fn expr(&self) -> &LexOrdering {
1056        &self.expr
1057    }
1058
1059    /// If `Some(fetch)`, limits output to only the first "fetch" items
1060    pub fn fetch(&self) -> Option<usize> {
1061        self.fetch
1062    }
1063
1064    fn output_partitioning_helper(
1065        input: &Arc<dyn ExecutionPlan>,
1066        preserve_partitioning: bool,
1067    ) -> Partitioning {
1068        // Get output partitioning:
1069        if preserve_partitioning {
1070            input.output_partitioning().clone()
1071        } else {
1072            Partitioning::UnknownPartitioning(1)
1073        }
1074    }
1075
1076    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
1077    /// It also returns the common sort prefix between the input and the sort expressions.
1078    fn compute_properties(
1079        input: &Arc<dyn ExecutionPlan>,
1080        sort_exprs: LexOrdering,
1081        preserve_partitioning: bool,
1082    ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1083        let (sort_prefix, sort_satisfied) = input
1084            .equivalence_properties()
1085            .extract_common_sort_prefix(sort_exprs.clone())?;
1086
1087        // The emission type depends on whether the input is already sorted:
1088        // - If already fully sorted, we can emit results in the same way as the input
1089        // - If not sorted, we must wait until all data is processed to emit results (Final)
1090        let emission_type = if sort_satisfied {
1091            input.pipeline_behavior()
1092        } else {
1093            EmissionType::Final
1094        };
1095
1096        // The boundedness depends on whether the input is already sorted:
1097        // - If already sorted, we have the same property as the input
1098        // - If not sorted and input is unbounded, we require infinite memory and generates
1099        //   unbounded data (not practical).
1100        // - If not sorted and input is bounded, then the SortExec is bounded, too.
1101        let boundedness = if sort_satisfied {
1102            input.boundedness()
1103        } else {
1104            match input.boundedness() {
1105                Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1106                    requires_infinite_memory: true,
1107                },
1108                bounded => bounded,
1109            }
1110        };
1111
1112        // Calculate equivalence properties; i.e. reset the ordering equivalence
1113        // class with the new ordering:
1114        let mut eq_properties = input.equivalence_properties().clone();
1115        eq_properties.reorder(sort_exprs)?;
1116
1117        // Get output partitioning:
1118        let output_partitioning =
1119            Self::output_partitioning_helper(input, preserve_partitioning);
1120
1121        Ok((
1122            PlanProperties::new(
1123                eq_properties,
1124                output_partitioning,
1125                emission_type,
1126                boundedness,
1127            ),
1128            sort_prefix,
1129        ))
1130    }
1131}
1132
1133impl DisplayAs for SortExec {
1134    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1135        match t {
1136            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1137                let preserve_partitioning = self.preserve_partitioning;
1138                match self.fetch {
1139                    Some(fetch) => {
1140                        write!(
1141                            f,
1142                            "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1143                            self.expr
1144                        )?;
1145                        if let Some(filter) = &self.filter
1146                            && let Ok(current) = filter.read().expr().current()
1147                            && !current.eq(&lit(true))
1148                        {
1149                            write!(f, ", filter=[{current}]")?;
1150                        }
1151                        if !self.common_sort_prefix.is_empty() {
1152                            write!(f, ", sort_prefix=[")?;
1153                            let mut first = true;
1154                            for sort_expr in &self.common_sort_prefix {
1155                                if first {
1156                                    first = false;
1157                                } else {
1158                                    write!(f, ", ")?;
1159                                }
1160                                write!(f, "{sort_expr}")?;
1161                            }
1162                            write!(f, "]")
1163                        } else {
1164                            Ok(())
1165                        }
1166                    }
1167                    None => write!(
1168                        f,
1169                        "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1170                        self.expr
1171                    ),
1172                }
1173            }
1174            DisplayFormatType::TreeRender => match self.fetch {
1175                Some(fetch) => {
1176                    writeln!(f, "{}", self.expr)?;
1177                    writeln!(f, "limit={fetch}")
1178                }
1179                None => {
1180                    writeln!(f, "{}", self.expr)
1181                }
1182            },
1183        }
1184    }
1185}
1186
1187impl ExecutionPlan for SortExec {
1188    fn name(&self) -> &'static str {
1189        match self.fetch {
1190            Some(_) => "SortExec(TopK)",
1191            None => "SortExec",
1192        }
1193    }
1194
1195    fn as_any(&self) -> &dyn Any {
1196        self
1197    }
1198
1199    fn properties(&self) -> &PlanProperties {
1200        &self.cache
1201    }
1202
1203    fn required_input_distribution(&self) -> Vec<Distribution> {
1204        if self.preserve_partitioning {
1205            vec![Distribution::UnspecifiedDistribution]
1206        } else {
1207            // global sort
1208            // TODO support RangePartition and OrderedDistribution
1209            vec![Distribution::SinglePartition]
1210        }
1211    }
1212
1213    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1214        vec![&self.input]
1215    }
1216
1217    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1218        vec![false]
1219    }
1220
1221    fn with_new_children(
1222        self: Arc<Self>,
1223        children: Vec<Arc<dyn ExecutionPlan>>,
1224    ) -> Result<Arc<dyn ExecutionPlan>> {
1225        let mut new_sort = self.cloned();
1226        assert_eq!(children.len(), 1, "SortExec should have exactly one child");
1227        new_sort.input = Arc::clone(&children[0]);
1228        // Recompute the properties based on the new input since they may have changed
1229        let (cache, sort_prefix) = Self::compute_properties(
1230            &new_sort.input,
1231            new_sort.expr.clone(),
1232            new_sort.preserve_partitioning,
1233        )?;
1234        new_sort.cache = cache;
1235        new_sort.common_sort_prefix = sort_prefix;
1236
1237        Ok(Arc::new(new_sort))
1238    }
1239
1240    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1241        let children = self.children().into_iter().cloned().collect();
1242        let new_sort = self.with_new_children(children)?;
1243        let mut new_sort = new_sort
1244            .as_any()
1245            .downcast_ref::<SortExec>()
1246            .expect("cloned 1 lines above this line, we know the type")
1247            .clone();
1248        // Our dynamic filter and execution metrics are the state we need to reset.
1249        new_sort.filter = Some(new_sort.create_filter());
1250        new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1251
1252        Ok(Arc::new(new_sort))
1253    }
1254
1255    fn execute(
1256        &self,
1257        partition: usize,
1258        context: Arc<TaskContext>,
1259    ) -> Result<SendableRecordBatchStream> {
1260        trace!(
1261            "Start SortExec::execute for partition {} of context session_id {} and task_id {:?}",
1262            partition,
1263            context.session_id(),
1264            context.task_id()
1265        );
1266
1267        let mut input = self.input.execute(partition, Arc::clone(&context))?;
1268
1269        let execution_options = &context.session_config().options().execution;
1270
1271        trace!("End SortExec's input.execute for partition: {partition}");
1272
1273        let sort_satisfied = self
1274            .input
1275            .equivalence_properties()
1276            .ordering_satisfy(self.expr.clone())?;
1277
1278        match (sort_satisfied, self.fetch.as_ref()) {
1279            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1280                input,
1281                0,
1282                Some(*fetch),
1283                BaselineMetrics::new(&self.metrics_set, partition),
1284            ))),
1285            (true, None) => Ok(input),
1286            (false, Some(fetch)) => {
1287                let filter = self.filter.clone();
1288                let mut topk = TopK::try_new(
1289                    partition,
1290                    input.schema(),
1291                    self.common_sort_prefix.clone(),
1292                    self.expr.clone(),
1293                    *fetch,
1294                    context.session_config().batch_size(),
1295                    context.runtime_env(),
1296                    &self.metrics_set,
1297                    Arc::clone(&unwrap_or_internal_err!(filter)),
1298                )?;
1299                Ok(Box::pin(RecordBatchStreamAdapter::new(
1300                    self.schema(),
1301                    futures::stream::once(async move {
1302                        while let Some(batch) = input.next().await {
1303                            let batch = batch?;
1304                            topk.insert_batch(batch)?;
1305                            if topk.finished {
1306                                break;
1307                            }
1308                        }
1309                        topk.emit()
1310                    })
1311                    .try_flatten(),
1312                )))
1313            }
1314            (false, None) => {
1315                let mut sorter = ExternalSorter::new(
1316                    partition,
1317                    input.schema(),
1318                    self.expr.clone(),
1319                    context.session_config().batch_size(),
1320                    execution_options.sort_spill_reservation_bytes,
1321                    execution_options.sort_in_place_threshold_bytes,
1322                    context.session_config().spill_compression(),
1323                    &self.metrics_set,
1324                    context.runtime_env(),
1325                )?;
1326                Ok(Box::pin(RecordBatchStreamAdapter::new(
1327                    self.schema(),
1328                    futures::stream::once(async move {
1329                        while let Some(batch) = input.next().await {
1330                            let batch = batch?;
1331                            sorter.insert_batch(batch).await?;
1332                        }
1333                        sorter.sort().await
1334                    })
1335                    .try_flatten(),
1336                )))
1337            }
1338        }
1339    }
1340
1341    fn metrics(&self) -> Option<MetricsSet> {
1342        Some(self.metrics_set.clone_inner())
1343    }
1344
1345    fn statistics(&self) -> Result<Statistics> {
1346        self.partition_statistics(None)
1347    }
1348
1349    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1350        if !self.preserve_partitioning() {
1351            return self
1352                .input
1353                .partition_statistics(None)?
1354                .with_fetch(self.fetch, 0, 1);
1355        }
1356        self.input
1357            .partition_statistics(partition)?
1358            .with_fetch(self.fetch, 0, 1)
1359    }
1360
1361    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1362        Some(Arc::new(SortExec::with_fetch(self, limit)))
1363    }
1364
1365    fn fetch(&self) -> Option<usize> {
1366        self.fetch
1367    }
1368
1369    fn cardinality_effect(&self) -> CardinalityEffect {
1370        if self.fetch.is_none() {
1371            CardinalityEffect::Equal
1372        } else {
1373            CardinalityEffect::LowerEqual
1374        }
1375    }
1376
1377    /// Tries to swap the projection with its input [`SortExec`]. If it can be done,
1378    /// it returns the new swapped version having the [`SortExec`] as the top plan.
1379    /// Otherwise, it returns None.
1380    fn try_swapping_with_projection(
1381        &self,
1382        projection: &ProjectionExec,
1383    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1384        // If the projection does not narrow the schema, we should not try to push it down.
1385        if projection.expr().len() >= projection.input().schema().fields().len() {
1386            return Ok(None);
1387        }
1388
1389        let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1390        else {
1391            return Ok(None);
1392        };
1393
1394        Ok(Some(Arc::new(
1395            SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1396                .with_fetch(self.fetch())
1397                .with_preserve_partitioning(self.preserve_partitioning()),
1398        )))
1399    }
1400
1401    fn gather_filters_for_pushdown(
1402        &self,
1403        phase: FilterPushdownPhase,
1404        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1405        config: &datafusion_common::config::ConfigOptions,
1406    ) -> Result<FilterDescription> {
1407        if !matches!(phase, FilterPushdownPhase::Post) {
1408            return FilterDescription::from_children(parent_filters, &self.children());
1409        }
1410
1411        let mut child =
1412            ChildFilterDescription::from_child(&parent_filters, self.input())?;
1413
1414        if let Some(filter) = &self.filter
1415            && config.optimizer.enable_topk_dynamic_filter_pushdown
1416        {
1417            child = child.with_self_filter(filter.read().expr());
1418        }
1419
1420        Ok(FilterDescription::new().with_child(child))
1421    }
1422}
1423
1424#[cfg(test)]
1425mod tests {
1426    use std::collections::HashMap;
1427    use std::pin::Pin;
1428    use std::task::{Context, Poll};
1429
1430    use super::*;
1431    use crate::coalesce_partitions::CoalescePartitionsExec;
1432    use crate::collect;
1433    use crate::execution_plan::Boundedness;
1434    use crate::expressions::col;
1435    use crate::test;
1436    use crate::test::TestMemoryExec;
1437    use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
1438    use crate::test::{assert_is_pending, make_partition};
1439
1440    use arrow::array::*;
1441    use arrow::compute::SortOptions;
1442    use arrow::datatypes::*;
1443    use datafusion_common::cast::as_primitive_array;
1444    use datafusion_common::test_util::batches_to_string;
1445    use datafusion_common::{DataFusionError, Result, ScalarValue};
1446    use datafusion_execution::RecordBatchStream;
1447    use datafusion_execution::config::SessionConfig;
1448    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1449    use datafusion_physical_expr::EquivalenceProperties;
1450    use datafusion_physical_expr::expressions::{Column, Literal};
1451
1452    use futures::{FutureExt, Stream};
1453    use insta::assert_snapshot;
1454
1455    #[derive(Debug, Clone)]
1456    pub struct SortedUnboundedExec {
1457        schema: Schema,
1458        batch_size: u64,
1459        cache: PlanProperties,
1460    }
1461
1462    impl DisplayAs for SortedUnboundedExec {
1463        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1464            match t {
1465                DisplayFormatType::Default
1466                | DisplayFormatType::Verbose
1467                | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1468            }
1469            Ok(())
1470        }
1471    }
1472
1473    impl SortedUnboundedExec {
1474        fn compute_properties(schema: SchemaRef) -> PlanProperties {
1475            let mut eq_properties = EquivalenceProperties::new(schema);
1476            eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1477                Column::new("c1", 0),
1478            ))]);
1479            PlanProperties::new(
1480                eq_properties,
1481                Partitioning::UnknownPartitioning(1),
1482                EmissionType::Final,
1483                Boundedness::Unbounded {
1484                    requires_infinite_memory: false,
1485                },
1486            )
1487        }
1488    }
1489
1490    impl ExecutionPlan for SortedUnboundedExec {
1491        fn name(&self) -> &'static str {
1492            Self::static_name()
1493        }
1494
1495        fn as_any(&self) -> &dyn Any {
1496            self
1497        }
1498
1499        fn properties(&self) -> &PlanProperties {
1500            &self.cache
1501        }
1502
1503        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1504            vec![]
1505        }
1506
1507        fn with_new_children(
1508            self: Arc<Self>,
1509            _: Vec<Arc<dyn ExecutionPlan>>,
1510        ) -> Result<Arc<dyn ExecutionPlan>> {
1511            Ok(self)
1512        }
1513
1514        fn execute(
1515            &self,
1516            _partition: usize,
1517            _context: Arc<TaskContext>,
1518        ) -> Result<SendableRecordBatchStream> {
1519            Ok(Box::pin(SortedUnboundedStream {
1520                schema: Arc::new(self.schema.clone()),
1521                batch_size: self.batch_size,
1522                offset: 0,
1523            }))
1524        }
1525    }
1526
1527    #[derive(Debug)]
1528    pub struct SortedUnboundedStream {
1529        schema: SchemaRef,
1530        batch_size: u64,
1531        offset: u64,
1532    }
1533
1534    impl Stream for SortedUnboundedStream {
1535        type Item = Result<RecordBatch>;
1536
1537        fn poll_next(
1538            mut self: Pin<&mut Self>,
1539            _cx: &mut Context<'_>,
1540        ) -> Poll<Option<Self::Item>> {
1541            let batch = SortedUnboundedStream::create_record_batch(
1542                Arc::clone(&self.schema),
1543                self.offset,
1544                self.batch_size,
1545            );
1546            self.offset += self.batch_size;
1547            Poll::Ready(Some(Ok(batch)))
1548        }
1549    }
1550
1551    impl RecordBatchStream for SortedUnboundedStream {
1552        fn schema(&self) -> SchemaRef {
1553            Arc::clone(&self.schema)
1554        }
1555    }
1556
1557    impl SortedUnboundedStream {
1558        fn create_record_batch(
1559            schema: SchemaRef,
1560            offset: u64,
1561            batch_size: u64,
1562        ) -> RecordBatch {
1563            let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1564            let array = UInt64Array::from(values);
1565            let array_ref: ArrayRef = Arc::new(array);
1566            RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1567        }
1568    }
1569
1570    #[tokio::test]
1571    async fn test_in_mem_sort() -> Result<()> {
1572        let task_ctx = Arc::new(TaskContext::default());
1573        let partitions = 4;
1574        let csv = test::scan_partitioned(partitions);
1575        let schema = csv.schema();
1576
1577        let sort_exec = Arc::new(SortExec::new(
1578            [PhysicalSortExpr {
1579                expr: col("i", &schema)?,
1580                options: SortOptions::default(),
1581            }]
1582            .into(),
1583            Arc::new(CoalescePartitionsExec::new(csv)),
1584        ));
1585
1586        let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1587
1588        assert_eq!(result.len(), 1);
1589        assert_eq!(result[0].num_rows(), 400);
1590        assert_eq!(
1591            task_ctx.runtime_env().memory_pool.reserved(),
1592            0,
1593            "The sort should have returned all memory used back to the memory manager"
1594        );
1595
1596        Ok(())
1597    }
1598
1599    #[tokio::test]
1600    async fn test_sort_spill() -> Result<()> {
1601        // trigger spill w/ 100 batches
1602        let session_config = SessionConfig::new();
1603        let sort_spill_reservation_bytes = session_config
1604            .options()
1605            .execution
1606            .sort_spill_reservation_bytes;
1607        let runtime = RuntimeEnvBuilder::new()
1608            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1609            .build_arc()?;
1610        let task_ctx = Arc::new(
1611            TaskContext::default()
1612                .with_session_config(session_config)
1613                .with_runtime(runtime),
1614        );
1615
1616        // The input has 100 partitions, each partition has a batch containing 100 rows.
1617        // Each row has a single Int32 column with values 0..100. The total size of the
1618        // input is roughly 40000 bytes.
1619        let partitions = 100;
1620        let input = test::scan_partitioned(partitions);
1621        let schema = input.schema();
1622
1623        let sort_exec = Arc::new(SortExec::new(
1624            [PhysicalSortExpr {
1625                expr: col("i", &schema)?,
1626                options: SortOptions::default(),
1627            }]
1628            .into(),
1629            Arc::new(CoalescePartitionsExec::new(input)),
1630        ));
1631
1632        let result = collect(
1633            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1634            Arc::clone(&task_ctx),
1635        )
1636        .await?;
1637
1638        assert_eq!(result.len(), 2);
1639
1640        // Now, validate metrics
1641        let metrics = sort_exec.metrics().unwrap();
1642
1643        assert_eq!(metrics.output_rows().unwrap(), 10000);
1644        assert!(metrics.elapsed_compute().unwrap() > 0);
1645
1646        let spill_count = metrics.spill_count().unwrap();
1647        let spilled_rows = metrics.spilled_rows().unwrap();
1648        let spilled_bytes = metrics.spilled_bytes().unwrap();
1649        // Processing 40000 bytes of data using 12288 bytes of memory requires 3 spills
1650        // unless we do something really clever. It will spill roughly 9000+ rows and 36000
1651        // bytes. We leave a little wiggle room for the actual numbers.
1652        assert!((3..=10).contains(&spill_count));
1653        assert!((9000..=10000).contains(&spilled_rows));
1654        assert!((38000..=44000).contains(&spilled_bytes));
1655
1656        let columns = result[0].columns();
1657
1658        let i = as_primitive_array::<Int32Type>(&columns[0])?;
1659        assert_eq!(i.value(0), 0);
1660        assert_eq!(i.value(i.len() - 1), 81);
1661        assert_eq!(
1662            task_ctx.runtime_env().memory_pool.reserved(),
1663            0,
1664            "The sort should have returned all memory used back to the memory manager"
1665        );
1666
1667        Ok(())
1668    }
1669
1670    #[tokio::test]
1671    async fn test_batch_reservation_error() -> Result<()> {
1672        // Pick a memory limit and sort_spill_reservation that make the first batch reservation fail.
1673        let merge_reservation: usize = 0; // Set to 0 for simplicity
1674
1675        let session_config =
1676            SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1677
1678        let plan = test::scan_partitioned(1);
1679
1680        // Read the first record batch to determine the actual memory requirement
1681        let expected_batch_reservation = {
1682            let temp_ctx = Arc::new(TaskContext::default());
1683            let mut stream = plan.execute(0, Arc::clone(&temp_ctx))?;
1684            let first_batch = stream.next().await.unwrap()?;
1685            get_reserved_bytes_for_record_batch(&first_batch)?
1686        };
1687
1688        // Set memory limit just short of what we need
1689        let memory_limit: usize = expected_batch_reservation + merge_reservation - 1;
1690
1691        let runtime = RuntimeEnvBuilder::new()
1692            .with_memory_limit(memory_limit, 1.0)
1693            .build_arc()?;
1694        let task_ctx = Arc::new(
1695            TaskContext::default()
1696                .with_session_config(session_config)
1697                .with_runtime(runtime),
1698        );
1699
1700        // Verify that our memory limit is insufficient
1701        {
1702            let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1703            let first_batch = stream.next().await.unwrap()?;
1704            let batch_reservation = get_reserved_bytes_for_record_batch(&first_batch)?;
1705
1706            assert_eq!(batch_reservation, expected_batch_reservation);
1707            assert!(memory_limit < (merge_reservation + batch_reservation));
1708        }
1709
1710        let sort_exec = Arc::new(SortExec::new(
1711            [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1712            plan,
1713        ));
1714
1715        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1716
1717        let err = result.unwrap_err();
1718        assert!(
1719            matches!(err, DataFusionError::Context(..)),
1720            "Assertion failed: expected a Context error, but got: {err:?}"
1721        );
1722
1723        // Assert that the context error is wrapping a resources exhausted error.
1724        assert!(
1725            matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1726            "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1727        );
1728
1729        Ok(())
1730    }
1731
1732    #[tokio::test]
1733    async fn test_sort_spill_utf8_strings() -> Result<()> {
1734        let session_config = SessionConfig::new()
1735            .with_batch_size(100)
1736            .with_sort_in_place_threshold_bytes(20 * 1024)
1737            .with_sort_spill_reservation_bytes(100 * 1024);
1738        let runtime = RuntimeEnvBuilder::new()
1739            .with_memory_limit(500 * 1024, 1.0)
1740            .build_arc()?;
1741        let task_ctx = Arc::new(
1742            TaskContext::default()
1743                .with_session_config(session_config)
1744                .with_runtime(runtime),
1745        );
1746
1747        // The input has 200 partitions, each partition has a batch containing 100 rows.
1748        // Each row has a single Utf8 column, the Utf8 string values are roughly 42 bytes.
1749        // The total size of the input is roughly 8.4 KB.
1750        let input = test::scan_partitioned_utf8(200);
1751        let schema = input.schema();
1752
1753        let sort_exec = Arc::new(SortExec::new(
1754            [PhysicalSortExpr {
1755                expr: col("i", &schema)?,
1756                options: SortOptions::default(),
1757            }]
1758            .into(),
1759            Arc::new(CoalescePartitionsExec::new(input)),
1760        ));
1761
1762        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1763
1764        let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1765        assert_eq!(num_rows, 20000);
1766
1767        // Now, validate metrics
1768        let metrics = sort_exec.metrics().unwrap();
1769
1770        assert_eq!(metrics.output_rows().unwrap(), 20000);
1771        assert!(metrics.elapsed_compute().unwrap() > 0);
1772
1773        let spill_count = metrics.spill_count().unwrap();
1774        let spilled_rows = metrics.spilled_rows().unwrap();
1775        let spilled_bytes = metrics.spilled_bytes().unwrap();
1776
1777        // This test case is processing 840KB of data using 400KB of memory. Note
1778        // that buffered batches can't be dropped until all sorted batches are
1779        // generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
1780        // batches.
1781        // The number of spills is roughly calculated as:
1782        //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1783
1784        // If this assertion fail with large spill count, make sure the following
1785        // case does not happen:
1786        // During external sorting, one sorted run should be spilled to disk in a
1787        // single file, due to memory limit we might need to append to the file
1788        // multiple times to spill all the data. Make sure we're not writing each
1789        // appending as a separate file.
1790        assert!((4..=8).contains(&spill_count));
1791        assert!((15000..=20000).contains(&spilled_rows));
1792        assert!((900000..=1000000).contains(&spilled_bytes));
1793
1794        // Verify that the result is sorted
1795        let concated_result = concat_batches(&schema, &result)?;
1796        let columns = concated_result.columns();
1797        let string_array = as_string_array(&columns[0]);
1798        for i in 0..string_array.len() - 1 {
1799            assert!(string_array.value(i) <= string_array.value(i + 1));
1800        }
1801
1802        assert_eq!(
1803            task_ctx.runtime_env().memory_pool.reserved(),
1804            0,
1805            "The sort should have returned all memory used back to the memory manager"
1806        );
1807
1808        Ok(())
1809    }
1810
1811    #[tokio::test]
1812    async fn test_sort_fetch_memory_calculation() -> Result<()> {
1813        // This test mirrors down the size from the example above.
1814        let avg_batch_size = 400;
1815        let partitions = 4;
1816
1817        // A tuple of (fetch, expect_spillage)
1818        let test_options = vec![
1819            // Since we don't have a limit (and the memory is less than the total size of
1820            // all the batches we are processing, we expect it to spill.
1821            (None, true),
1822            // When we have a limit however, the buffered size of batches should fit in memory
1823            // since it is much lower than the total size of the input batch.
1824            (Some(1), false),
1825        ];
1826
1827        for (fetch, expect_spillage) in test_options {
1828            let session_config = SessionConfig::new();
1829            let sort_spill_reservation_bytes = session_config
1830                .options()
1831                .execution
1832                .sort_spill_reservation_bytes;
1833
1834            let runtime = RuntimeEnvBuilder::new()
1835                .with_memory_limit(
1836                    sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1837                    1.0,
1838                )
1839                .build_arc()?;
1840            let task_ctx = Arc::new(
1841                TaskContext::default()
1842                    .with_runtime(runtime)
1843                    .with_session_config(session_config),
1844            );
1845
1846            let csv = test::scan_partitioned(partitions);
1847            let schema = csv.schema();
1848
1849            let sort_exec = Arc::new(
1850                SortExec::new(
1851                    [PhysicalSortExpr {
1852                        expr: col("i", &schema)?,
1853                        options: SortOptions::default(),
1854                    }]
1855                    .into(),
1856                    Arc::new(CoalescePartitionsExec::new(csv)),
1857                )
1858                .with_fetch(fetch),
1859            );
1860
1861            let result =
1862                collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1863            assert_eq!(result.len(), 1);
1864
1865            let metrics = sort_exec.metrics().unwrap();
1866            let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1867            assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1868        }
1869        Ok(())
1870    }
1871
1872    #[tokio::test]
1873    async fn test_sort_memory_reduction_per_batch() -> Result<()> {
1874        // This test verifies that memory reservation is reduced for every batch emitted
1875        // during the sort process. This is important to ensure we don't hold onto
1876        // memory longer than necessary.
1877
1878        // Create a large enough batch that will be split into multiple output batches
1879        let batch_size = 50; // Small batch size to force multiple output batches
1880        let num_rows = 1000; // Create enough data for multiple batches
1881
1882        let task_ctx = Arc::new(
1883            TaskContext::default().with_session_config(
1884                SessionConfig::new()
1885                    .with_batch_size(batch_size)
1886                    .with_sort_in_place_threshold_bytes(usize::MAX), // Ensure we don't concat batches
1887            ),
1888        );
1889
1890        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1891
1892        // Create unsorted data
1893        let mut values: Vec<i32> = (0..num_rows).collect();
1894        values.reverse();
1895
1896        let input_batch = RecordBatch::try_new(
1897            Arc::clone(&schema),
1898            vec![Arc::new(Int32Array::from(values))],
1899        )?;
1900
1901        let batches = vec![input_batch];
1902
1903        let sort_exec = Arc::new(SortExec::new(
1904            [PhysicalSortExpr {
1905                expr: Arc::new(Column::new("a", 0)),
1906                options: SortOptions::default(),
1907            }]
1908            .into(),
1909            TestMemoryExec::try_new_exec(
1910                std::slice::from_ref(&batches),
1911                Arc::clone(&schema),
1912                None,
1913            )?,
1914        ));
1915
1916        let mut stream = sort_exec.execute(0, Arc::clone(&task_ctx))?;
1917
1918        let mut previous_reserved = task_ctx.runtime_env().memory_pool.reserved();
1919        let mut batch_count = 0;
1920
1921        // Collect batches and verify memory is reduced with each batch
1922        while let Some(result) = stream.next().await {
1923            let batch = result?;
1924            batch_count += 1;
1925
1926            // Verify we got a non-empty batch
1927            assert!(batch.num_rows() > 0, "Batch should not be empty");
1928
1929            let current_reserved = task_ctx.runtime_env().memory_pool.reserved();
1930
1931            // After the first batch, memory should be reducing or staying the same
1932            // (it should not increase as we emit batches)
1933            if batch_count > 1 {
1934                assert!(
1935                    current_reserved <= previous_reserved,
1936                    "Memory reservation should decrease or stay same as batches are emitted. \
1937                     Batch {batch_count}: previous={previous_reserved}, current={current_reserved}"
1938                );
1939            }
1940
1941            previous_reserved = current_reserved;
1942        }
1943
1944        assert!(
1945            batch_count > 1,
1946            "Expected multiple batches to be emitted, got {batch_count}"
1947        );
1948
1949        // Verify all memory is returned at the end
1950        assert_eq!(
1951            task_ctx.runtime_env().memory_pool.reserved(),
1952            0,
1953            "All memory should be returned after consuming all batches"
1954        );
1955
1956        Ok(())
1957    }
1958
1959    #[tokio::test]
1960    async fn test_sort_metadata() -> Result<()> {
1961        let task_ctx = Arc::new(TaskContext::default());
1962        let field_metadata: HashMap<String, String> =
1963            vec![("foo".to_string(), "bar".to_string())]
1964                .into_iter()
1965                .collect();
1966        let schema_metadata: HashMap<String, String> =
1967            vec![("baz".to_string(), "barf".to_string())]
1968                .into_iter()
1969                .collect();
1970
1971        let mut field = Field::new("field_name", DataType::UInt64, true);
1972        field.set_metadata(field_metadata.clone());
1973        let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1974        let schema = Arc::new(schema);
1975
1976        let data: ArrayRef =
1977            Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1978
1979        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1980        let input =
1981            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1982
1983        let sort_exec = Arc::new(SortExec::new(
1984            [PhysicalSortExpr {
1985                expr: col("field_name", &schema)?,
1986                options: SortOptions::default(),
1987            }]
1988            .into(),
1989            input,
1990        ));
1991
1992        let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1993
1994        let expected_data: ArrayRef =
1995            Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1996        let expected_batch =
1997            RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
1998
1999        // Data is correct
2000        assert_eq!(&vec![expected_batch], &result);
2001
2002        // explicitly ensure the metadata is present
2003        assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
2004        assert_eq!(result[0].schema().metadata(), &schema_metadata);
2005
2006        Ok(())
2007    }
2008
2009    #[tokio::test]
2010    async fn test_lex_sort_by_mixed_types() -> Result<()> {
2011        let task_ctx = Arc::new(TaskContext::default());
2012        let schema = Arc::new(Schema::new(vec![
2013            Field::new("a", DataType::Int32, true),
2014            Field::new(
2015                "b",
2016                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2017                true,
2018            ),
2019        ]));
2020
2021        // define data.
2022        let batch = RecordBatch::try_new(
2023            Arc::clone(&schema),
2024            vec![
2025                Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
2026                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2027                    Some(vec![Some(3)]),
2028                    Some(vec![Some(1)]),
2029                    Some(vec![Some(6), None]),
2030                    Some(vec![Some(5)]),
2031                ])),
2032            ],
2033        )?;
2034
2035        let sort_exec = Arc::new(SortExec::new(
2036            [
2037                PhysicalSortExpr {
2038                    expr: col("a", &schema)?,
2039                    options: SortOptions {
2040                        descending: false,
2041                        nulls_first: true,
2042                    },
2043                },
2044                PhysicalSortExpr {
2045                    expr: col("b", &schema)?,
2046                    options: SortOptions {
2047                        descending: true,
2048                        nulls_first: false,
2049                    },
2050                },
2051            ]
2052            .into(),
2053            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
2054        ));
2055
2056        assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
2057        assert_eq!(
2058            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2059            *sort_exec.schema().field(1).data_type()
2060        );
2061
2062        let result: Vec<RecordBatch> =
2063            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2064        let metrics = sort_exec.metrics().unwrap();
2065        assert!(metrics.elapsed_compute().unwrap() > 0);
2066        assert_eq!(metrics.output_rows().unwrap(), 4);
2067        assert_eq!(result.len(), 1);
2068
2069        let expected = RecordBatch::try_new(
2070            schema,
2071            vec![
2072                Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
2073                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2074                    Some(vec![Some(1)]),
2075                    Some(vec![Some(6), None]),
2076                    Some(vec![Some(5)]),
2077                    Some(vec![Some(3)]),
2078                ])),
2079            ],
2080        )?;
2081
2082        assert_eq!(expected, result[0]);
2083
2084        Ok(())
2085    }
2086
2087    #[tokio::test]
2088    async fn test_lex_sort_by_float() -> Result<()> {
2089        let task_ctx = Arc::new(TaskContext::default());
2090        let schema = Arc::new(Schema::new(vec![
2091            Field::new("a", DataType::Float32, true),
2092            Field::new("b", DataType::Float64, true),
2093        ]));
2094
2095        // define data.
2096        let batch = RecordBatch::try_new(
2097            Arc::clone(&schema),
2098            vec![
2099                Arc::new(Float32Array::from(vec![
2100                    Some(f32::NAN),
2101                    None,
2102                    None,
2103                    Some(f32::NAN),
2104                    Some(1.0_f32),
2105                    Some(1.0_f32),
2106                    Some(2.0_f32),
2107                    Some(3.0_f32),
2108                ])),
2109                Arc::new(Float64Array::from(vec![
2110                    Some(200.0_f64),
2111                    Some(20.0_f64),
2112                    Some(10.0_f64),
2113                    Some(100.0_f64),
2114                    Some(f64::NAN),
2115                    None,
2116                    None,
2117                    Some(f64::NAN),
2118                ])),
2119            ],
2120        )?;
2121
2122        let sort_exec = Arc::new(SortExec::new(
2123            [
2124                PhysicalSortExpr {
2125                    expr: col("a", &schema)?,
2126                    options: SortOptions {
2127                        descending: true,
2128                        nulls_first: true,
2129                    },
2130                },
2131                PhysicalSortExpr {
2132                    expr: col("b", &schema)?,
2133                    options: SortOptions {
2134                        descending: false,
2135                        nulls_first: false,
2136                    },
2137                },
2138            ]
2139            .into(),
2140            TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
2141        ));
2142
2143        assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
2144        assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
2145
2146        let result: Vec<RecordBatch> =
2147            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2148        let metrics = sort_exec.metrics().unwrap();
2149        assert!(metrics.elapsed_compute().unwrap() > 0);
2150        assert_eq!(metrics.output_rows().unwrap(), 8);
2151        assert_eq!(result.len(), 1);
2152
2153        let columns = result[0].columns();
2154
2155        assert_eq!(DataType::Float32, *columns[0].data_type());
2156        assert_eq!(DataType::Float64, *columns[1].data_type());
2157
2158        let a = as_primitive_array::<Float32Type>(&columns[0])?;
2159        let b = as_primitive_array::<Float64Type>(&columns[1])?;
2160
2161        // convert result to strings to allow comparing to expected result containing NaN
2162        let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2163            .map(|i| {
2164                let aval = if a.is_valid(i) {
2165                    Some(a.value(i).to_string())
2166                } else {
2167                    None
2168                };
2169                let bval = if b.is_valid(i) {
2170                    Some(b.value(i).to_string())
2171                } else {
2172                    None
2173                };
2174                (aval, bval)
2175            })
2176            .collect();
2177
2178        let expected: Vec<(Option<String>, Option<String>)> = vec![
2179            (None, Some("10".to_owned())),
2180            (None, Some("20".to_owned())),
2181            (Some("NaN".to_owned()), Some("100".to_owned())),
2182            (Some("NaN".to_owned()), Some("200".to_owned())),
2183            (Some("3".to_owned()), Some("NaN".to_owned())),
2184            (Some("2".to_owned()), None),
2185            (Some("1".to_owned()), Some("NaN".to_owned())),
2186            (Some("1".to_owned()), None),
2187        ];
2188
2189        assert_eq!(expected, result);
2190
2191        Ok(())
2192    }
2193
2194    #[tokio::test]
2195    async fn test_drop_cancel() -> Result<()> {
2196        let task_ctx = Arc::new(TaskContext::default());
2197        let schema =
2198            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2199
2200        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2201        let refs = blocking_exec.refs();
2202        let sort_exec = Arc::new(SortExec::new(
2203            [PhysicalSortExpr {
2204                expr: col("a", &schema)?,
2205                options: SortOptions::default(),
2206            }]
2207            .into(),
2208            blocking_exec,
2209        ));
2210
2211        let fut = collect(sort_exec, Arc::clone(&task_ctx));
2212        let mut fut = fut.boxed();
2213
2214        assert_is_pending(&mut fut);
2215        drop(fut);
2216        assert_strong_count_converges_to_zero(refs).await;
2217
2218        assert_eq!(
2219            task_ctx.runtime_env().memory_pool.reserved(),
2220            0,
2221            "The sort should have returned all memory used back to the memory manager"
2222        );
2223
2224        Ok(())
2225    }
2226
2227    #[test]
2228    fn test_empty_sort_batch() {
2229        let schema = Arc::new(Schema::empty());
2230        let options = RecordBatchOptions::new().with_row_count(Some(1));
2231        let batch =
2232            RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2233                .unwrap();
2234
2235        let expressions = [PhysicalSortExpr {
2236            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2237            options: SortOptions::default(),
2238        }]
2239        .into();
2240
2241        let result = sort_batch(&batch, &expressions, None).unwrap();
2242        assert_eq!(result.num_rows(), 1);
2243    }
2244
2245    #[tokio::test]
2246    async fn topk_unbounded_source() -> Result<()> {
2247        let task_ctx = Arc::new(TaskContext::default());
2248        let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2249        let source = SortedUnboundedExec {
2250            schema: schema.clone(),
2251            batch_size: 2,
2252            cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
2253        };
2254        let mut plan = SortExec::new(
2255            [PhysicalSortExpr::new_default(Arc::new(Column::new(
2256                "c1", 0,
2257            )))]
2258            .into(),
2259            Arc::new(source),
2260        );
2261        plan = plan.with_fetch(Some(9));
2262
2263        let batches = collect(Arc::new(plan), task_ctx).await?;
2264        assert_snapshot!(batches_to_string(&batches), @r"
2265        +----+
2266        | c1 |
2267        +----+
2268        | 0  |
2269        | 1  |
2270        | 2  |
2271        | 3  |
2272        | 4  |
2273        | 5  |
2274        | 6  |
2275        | 7  |
2276        | 8  |
2277        +----+
2278        ");
2279        Ok(())
2280    }
2281
2282    #[tokio::test]
2283    async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2284        let batch_size = 100;
2285
2286        let create_task_ctx = |_: &[RecordBatch]| {
2287            TaskContext::default().with_session_config(
2288                SessionConfig::new()
2289                    .with_batch_size(batch_size)
2290                    .with_sort_in_place_threshold_bytes(usize::MAX),
2291            )
2292        };
2293
2294        // Smaller than batch size and require more than a single batch to get the requested batch size
2295        test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2296
2297        // Not evenly divisible by batch size
2298        test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2299
2300        // Evenly divisible by batch size and is larger than 2 output batches
2301        test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2302
2303        Ok(())
2304    }
2305
2306    #[tokio::test]
2307    async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place()
2308    -> Result<()> {
2309        let batch_size = 100;
2310
2311        let create_task_ctx = |_: &[RecordBatch]| {
2312            TaskContext::default().with_session_config(
2313                SessionConfig::new()
2314                    .with_batch_size(batch_size)
2315                    .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2316            )
2317        };
2318
2319        // Smaller than batch size and require more than a single batch to get the requested batch size
2320        {
2321            let metrics =
2322                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2323
2324            assert_eq!(
2325                metrics.spill_count(),
2326                Some(0),
2327                "Expected no spills when sorting in place"
2328            );
2329        }
2330
2331        // Not evenly divisible by batch size
2332        {
2333            let metrics =
2334                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2335
2336            assert_eq!(
2337                metrics.spill_count(),
2338                Some(0),
2339                "Expected no spills when sorting in place"
2340            );
2341        }
2342
2343        // Evenly divisible by batch size and is larger than 2 output batches
2344        {
2345            let metrics =
2346                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2347
2348            assert_eq!(
2349                metrics.spill_count(),
2350                Some(0),
2351                "Expected no spills when sorting in place"
2352            );
2353        }
2354
2355        Ok(())
2356    }
2357
2358    #[tokio::test]
2359    async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch()
2360    -> Result<()> {
2361        let batch_size = 100;
2362
2363        let create_task_ctx = |_: &[RecordBatch]| {
2364            TaskContext::default()
2365                .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2366        };
2367
2368        // Smaller than batch size and require more than a single batch to get the requested batch size
2369        {
2370            let metrics = test_sort_output_batch_size(
2371                // Single batch
2372                1,
2373                batch_size / 4,
2374                create_task_ctx,
2375            )
2376            .await?;
2377
2378            assert_eq!(
2379                metrics.spill_count(),
2380                Some(0),
2381                "Expected no spills when sorting in place"
2382            );
2383        }
2384
2385        // Not evenly divisible by batch size
2386        {
2387            let metrics = test_sort_output_batch_size(
2388                // Single batch
2389                1,
2390                batch_size + 7,
2391                create_task_ctx,
2392            )
2393            .await?;
2394
2395            assert_eq!(
2396                metrics.spill_count(),
2397                Some(0),
2398                "Expected no spills when sorting in place"
2399            );
2400        }
2401
2402        // Evenly divisible by batch size and is larger than 2 output batches
2403        {
2404            let metrics = test_sort_output_batch_size(
2405                // Single batch
2406                1,
2407                batch_size * 3,
2408                create_task_ctx,
2409            )
2410            .await?;
2411
2412            assert_eq!(
2413                metrics.spill_count(),
2414                Some(0),
2415                "Expected no spills when sorting in place"
2416            );
2417        }
2418
2419        Ok(())
2420    }
2421
2422    #[tokio::test]
2423    async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill()
2424    -> Result<()> {
2425        let batch_size = 100;
2426
2427        let create_task_ctx = |generated_batches: &[RecordBatch]| {
2428            let batches_memory = generated_batches
2429                .iter()
2430                .map(|b| b.get_array_memory_size())
2431                .sum::<usize>();
2432
2433            TaskContext::default()
2434                .with_session_config(
2435                    SessionConfig::new()
2436                        .with_batch_size(batch_size)
2437                        // To make sure there is no in place sorting
2438                        .with_sort_in_place_threshold_bytes(1)
2439                        .with_sort_spill_reservation_bytes(1),
2440                )
2441                .with_runtime(
2442                    RuntimeEnvBuilder::default()
2443                        .with_memory_limit(batches_memory, 1.0)
2444                        .build_arc()
2445                        .unwrap(),
2446                )
2447        };
2448
2449        // Smaller than batch size and require more than a single batch to get the requested batch size
2450        {
2451            let metrics =
2452                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2453
2454            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2455        }
2456
2457        // Not evenly divisible by batch size
2458        {
2459            let metrics =
2460                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2461
2462            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2463        }
2464
2465        // Evenly divisible by batch size and is larger than 2 batches
2466        {
2467            let metrics =
2468                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2469
2470            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2471        }
2472
2473        Ok(())
2474    }
2475
2476    async fn test_sort_output_batch_size(
2477        number_of_batches: usize,
2478        batch_size_to_generate: usize,
2479        create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2480    ) -> Result<MetricsSet> {
2481        let batches = (0..number_of_batches)
2482            .map(|_| make_partition(batch_size_to_generate as i32))
2483            .collect::<Vec<_>>();
2484        let task_ctx = create_task_ctx(batches.as_slice());
2485
2486        let expected_batch_size = task_ctx.session_config().batch_size();
2487
2488        let (mut output_batches, metrics) =
2489            run_sort_on_input(task_ctx, "i", batches).await?;
2490
2491        let last_batch = output_batches.pop().unwrap();
2492
2493        for batch in output_batches {
2494            assert_eq!(batch.num_rows(), expected_batch_size);
2495        }
2496
2497        let mut last_expected_batch_size =
2498            (batch_size_to_generate * number_of_batches) % expected_batch_size;
2499        if last_expected_batch_size == 0 {
2500            last_expected_batch_size = expected_batch_size;
2501        }
2502        assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2503
2504        Ok(metrics)
2505    }
2506
2507    async fn run_sort_on_input(
2508        task_ctx: TaskContext,
2509        order_by_col: &str,
2510        batches: Vec<RecordBatch>,
2511    ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2512        let task_ctx = Arc::new(task_ctx);
2513
2514        // let task_ctx = env.
2515        let schema = batches[0].schema();
2516        let ordering: LexOrdering = [PhysicalSortExpr {
2517            expr: col(order_by_col, &schema)?,
2518            options: SortOptions {
2519                descending: false,
2520                nulls_first: true,
2521            },
2522        }]
2523        .into();
2524        let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2525            ordering.clone(),
2526            TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2527        ));
2528
2529        let sorted_batches =
2530            collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2531
2532        let metrics = sort_exec.metrics().expect("sort have metrics");
2533
2534        // assert output
2535        {
2536            let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2537            let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2538
2539            let sorted_batches_concat =
2540                concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2541
2542            assert_eq!(sorted_input_batch, sorted_batches_concat);
2543        }
2544
2545        Ok((sorted_batches, metrics))
2546    }
2547
2548    #[tokio::test]
2549    async fn test_sort_batch_chunked_basic() -> Result<()> {
2550        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2551
2552        // Create a batch with 1000 rows
2553        let mut values: Vec<i32> = (0..1000).collect();
2554        // Shuffle to make it unsorted
2555        values.reverse();
2556
2557        let batch = RecordBatch::try_new(
2558            Arc::clone(&schema),
2559            vec![Arc::new(Int32Array::from(values))],
2560        )?;
2561
2562        let expressions: LexOrdering =
2563            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2564
2565        // Sort with batch_size = 250
2566        let result_batches = sort_batch_chunked(&batch, &expressions, 250)?;
2567
2568        // Verify 4 batches are returned
2569        assert_eq!(result_batches.len(), 4);
2570
2571        // Verify each batch has <= 250 rows
2572        let mut total_rows = 0;
2573        for (i, batch) in result_batches.iter().enumerate() {
2574            assert!(
2575                batch.num_rows() <= 250,
2576                "Batch {} has {} rows, expected <= 250",
2577                i,
2578                batch.num_rows()
2579            );
2580            total_rows += batch.num_rows();
2581        }
2582
2583        // Verify total row count matches input
2584        assert_eq!(total_rows, 1000);
2585
2586        // Verify data is correctly sorted across all chunks
2587        let concatenated = concat_batches(&schema, &result_batches)?;
2588        let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2589        for i in 0..array.len() - 1 {
2590            assert!(
2591                array.value(i) <= array.value(i + 1),
2592                "Array not sorted at position {}: {} > {}",
2593                i,
2594                array.value(i),
2595                array.value(i + 1)
2596            );
2597        }
2598        assert_eq!(array.value(0), 0);
2599        assert_eq!(array.value(array.len() - 1), 999);
2600
2601        Ok(())
2602    }
2603
2604    #[tokio::test]
2605    async fn test_sort_batch_chunked_smaller_than_batch_size() -> Result<()> {
2606        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2607
2608        // Create a batch with 50 rows
2609        let values: Vec<i32> = (0..50).rev().collect();
2610        let batch = RecordBatch::try_new(
2611            Arc::clone(&schema),
2612            vec![Arc::new(Int32Array::from(values))],
2613        )?;
2614
2615        let expressions: LexOrdering =
2616            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2617
2618        // Sort with batch_size = 100
2619        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2620
2621        // Should return exactly 1 batch
2622        assert_eq!(result_batches.len(), 1);
2623        assert_eq!(result_batches[0].num_rows(), 50);
2624
2625        // Verify it's correctly sorted
2626        let array = as_primitive_array::<Int32Type>(result_batches[0].column(0))?;
2627        for i in 0..array.len() - 1 {
2628            assert!(array.value(i) <= array.value(i + 1));
2629        }
2630        assert_eq!(array.value(0), 0);
2631        assert_eq!(array.value(49), 49);
2632
2633        Ok(())
2634    }
2635
2636    #[tokio::test]
2637    async fn test_sort_batch_chunked_exact_multiple() -> Result<()> {
2638        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2639
2640        // Create a batch with 1000 rows
2641        let values: Vec<i32> = (0..1000).rev().collect();
2642        let batch = RecordBatch::try_new(
2643            Arc::clone(&schema),
2644            vec![Arc::new(Int32Array::from(values))],
2645        )?;
2646
2647        let expressions: LexOrdering =
2648            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2649
2650        // Sort with batch_size = 100
2651        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2652
2653        // Should return exactly 10 batches of 100 rows each
2654        assert_eq!(result_batches.len(), 10);
2655        for batch in &result_batches {
2656            assert_eq!(batch.num_rows(), 100);
2657        }
2658
2659        // Verify sorted correctly across all batches
2660        let concatenated = concat_batches(&schema, &result_batches)?;
2661        let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2662        for i in 0..array.len() - 1 {
2663            assert!(array.value(i) <= array.value(i + 1));
2664        }
2665
2666        Ok(())
2667    }
2668
2669    #[tokio::test]
2670    async fn test_sort_batch_chunked_empty_batch() -> Result<()> {
2671        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2672
2673        let batch = RecordBatch::new_empty(Arc::clone(&schema));
2674
2675        let expressions: LexOrdering =
2676            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2677
2678        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2679
2680        // Empty input produces no output batches (0 chunks)
2681        assert_eq!(result_batches.len(), 0);
2682
2683        Ok(())
2684    }
2685
2686    #[tokio::test]
2687    async fn test_get_reserved_bytes_for_record_batch_with_sliced_batches() -> Result<()>
2688    {
2689        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2690
2691        // Create a larger batch then slice it
2692        let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
2693        let sliced_array = large_array.slice(100, 50); // Take 50 elements starting at 100
2694
2695        let sliced_batch =
2696            RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(sliced_array)])?;
2697        let batch =
2698            RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(large_array)])?;
2699
2700        let sliced_reserved = get_reserved_bytes_for_record_batch(&sliced_batch)?;
2701        let reserved = get_reserved_bytes_for_record_batch(&batch)?;
2702
2703        // The reserved memory for the sliced batch should be less than that of the full batch
2704        assert!(reserved > sliced_reserved);
2705
2706        Ok(())
2707    }
2708}