Skip to main content

datafusion_physical_plan/sorts/
sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort that deals with an arbitrary size of the input.
19//! It will do in-memory sorting if it has enough memory budget
20//! but spills to disk if needed.
21
22use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28
29use crate::common::spawn_buffered;
30use crate::execution_plan::{
31    Boundedness, CardinalityEffect, EmissionType, has_same_children_properties,
32};
33use crate::expressions::PhysicalSortExpr;
34use crate::filter_pushdown::{
35    ChildFilterDescription, FilterDescription, FilterPushdownPhase,
36};
37use crate::limit::LimitStream;
38use crate::metrics::{
39    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
40};
41use crate::projection::{ProjectionExec, make_with_child, update_ordering};
42use crate::sorts::IncrementalSortIterator;
43use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
44use crate::spill::get_record_batch_memory_size;
45use crate::spill::in_progress_spill_file::InProgressSpillFile;
46use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
47use crate::stream::RecordBatchStreamAdapter;
48use crate::stream::ReservationStream;
49use crate::topk::TopK;
50use crate::topk::TopKDynamicFilters;
51use crate::{
52    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
53    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
54    Statistics,
55};
56
57use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
58use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
59use arrow::datatypes::SchemaRef;
60use datafusion_common::config::SpillCompression;
61use datafusion_common::{
62    DataFusionError, Result, assert_or_internal_err, internal_datafusion_err,
63    unwrap_or_internal_err,
64};
65use datafusion_execution::TaskContext;
66use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
67use datafusion_execution::runtime_env::RuntimeEnv;
68use datafusion_physical_expr::LexOrdering;
69use datafusion_physical_expr::PhysicalExpr;
70use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
71
72use futures::{StreamExt, TryStreamExt};
73use log::{debug, trace};
74
75struct ExternalSorterMetrics {
76    /// metrics
77    baseline: BaselineMetrics,
78
79    spill_metrics: SpillMetrics,
80}
81
82impl ExternalSorterMetrics {
83    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
84        Self {
85            baseline: BaselineMetrics::new(metrics, partition),
86            spill_metrics: SpillMetrics::new(metrics, partition),
87        }
88    }
89}
90
91/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
92/// a total order. Depending on the input size and memory manager
93/// configuration, writes intermediate results to disk ("spills")
94/// using Arrow IPC format.
95///
96/// # Algorithm
97///
98/// 1. get a non-empty new batch from input
99///
100/// 2. check with the memory manager there is sufficient space to
101///    buffer the batch in memory.
102///
103/// 2.1 if memory is sufficient, buffer batch in memory, go to 1.
104///
105/// 2.2 if no more memory is available, sort all buffered batches and
106///     spill to file.  buffer the next batch in memory, go to 1.
107///
108/// 3. when input is exhausted, merge all in memory batches and spills
109///    to get a total order.
110///
111/// # When data fits in available memory
112///
113/// If there is sufficient memory, data is sorted in memory to produce the output
114///
115/// ```text
116///    ┌─────┐
117///    │  2  │
118///    │  3  │
119///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
120///    │  4  │
121///    │  2  │                  │
122///    └─────┘                  ▼
123///    ┌─────┐
124///    │  1  │              In memory
125///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
126///    │  1  │
127///    └─────┘                  ▲
128///      ...                    │
129///
130///    ┌─────┐                  │
131///    │  4  │
132///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
133///    └─────┘
134///
135/// in_mem_batches
136/// ```
137///
138/// # When data does not fit in available memory
139///
140///  When memory is exhausted, data is first sorted and written to one
141///  or more spill files on disk:
142///
143/// ```text
144///    ┌─────┐                               .─────────────────.
145///    │  2  │                              (                   )
146///    │  3  │                              │`─────────────────'│
147///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
148///    │  4  │             │                │  │ 1  │░          │
149///    │  2  │                              │  │... │░          │
150///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
151///    ┌─────┐                              │  └────┘░    1  │░ │
152///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
153///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
154///    │  1  │     and write to file        │           │    ░░ │
155///    └─────┘                              │             4  │░ │
156///      ...               ▲                │           └░─░─░░ │
157///                        │                │            ░░░░░░ │
158///    ┌─────┐                              │.─────────────────.│
159///    │  4  │             │                (                   )
160///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
161///    └─────┘
162///
163/// in_mem_batches                                  spills
164///                                         (file on disk in Arrow
165///                                               IPC format)
166/// ```
167///
168/// Once the input is completely read, the spill files are read and
169/// merged with any in memory batches to produce a single total sorted
170/// output:
171///
172/// ```text
173///   .─────────────────.
174///  (                   )
175///  │`─────────────────'│
176///  │  ┌────┐           │
177///  │  │ 1  │░          │
178///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
179///  │  │ 4  │░ ┌────┐   │           │
180///  │  └────┘░ │ 1  │░  │           ▼
181///  │   ░░░░░░ │    │░  │
182///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
183///  │          │    │░  │
184///  │          │ 4  │░  │           ▲
185///  │          └────┘░  │           │
186///  │           ░░░░░░  │
187///  │.─────────────────.│           │
188///  (                   )
189///   `─────────────────'            │
190///         spills
191///                                  │
192///
193///                                  │
194///
195///     ┌─────┐                      │
196///     │  1  │
197///     │  4  │─ ─ ─ ─               │
198///     └─────┘       │
199///       ...                   In memory
200///                   └ ─ ─ ─▶  sort/merge
201///     ┌─────┐
202///     │  4  │                      ▲
203///     │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
204///     └─────┘
205///
206///  in_mem_batches
207/// ```
208struct ExternalSorter {
209    // ========================================================================
210    // PROPERTIES:
211    // Fields that define the sorter's configuration and remain constant
212    // ========================================================================
213    /// Schema of the output (and the input)
214    schema: SchemaRef,
215    /// Sort expressions
216    expr: LexOrdering,
217    /// The target number of rows for output batches
218    batch_size: usize,
219    /// If the in size of buffered memory batches is below this size,
220    /// the data will be concatenated and sorted in place rather than
221    /// sort/merged.
222    sort_in_place_threshold_bytes: usize,
223
224    // ========================================================================
225    // STATE BUFFERS:
226    // Fields that hold intermediate data during sorting
227    // ========================================================================
228    /// Unsorted input batches stored in the memory buffer
229    in_mem_batches: Vec<RecordBatch>,
230
231    /// During external sorting, in-memory intermediate data will be appended to
232    /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
233    ///
234    /// this is a tuple of:
235    /// 1. `InProgressSpillFile` - the file that is being written to
236    /// 2. `max_record_batch_memory` - the maximum memory usage of a single batch in this spill file.
237    in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
238    /// If data has previously been spilled, the locations of the spill files (in
239    /// Arrow IPC format)
240    /// Within the same spill file, the data might be chunked into multiple batches,
241    /// and ordered by sort keys.
242    finished_spill_files: Vec<SortedSpillFile>,
243
244    // ========================================================================
245    // EXECUTION RESOURCES:
246    // Fields related to managing execution resources and monitoring performance.
247    // ========================================================================
248    /// Runtime metrics
249    metrics: ExternalSorterMetrics,
250    /// A handle to the runtime to get spill files
251    runtime: Arc<RuntimeEnv>,
252    /// Reservation for in_mem_batches
253    reservation: MemoryReservation,
254    spill_manager: SpillManager,
255
256    /// Reservation for the merging of in-memory batches. If the sort
257    /// might spill, `sort_spill_reservation_bytes` will be
258    /// pre-reserved to ensure there is some space for this sort/merge.
259    merge_reservation: MemoryReservation,
260    /// How much memory to reserve for performing in-memory sort/merges
261    /// prior to spilling.
262    sort_spill_reservation_bytes: usize,
263}
264
265impl ExternalSorter {
266    // TODO: make a builder or some other nicer API to avoid the
267    // clippy warning
268    #[expect(clippy::too_many_arguments)]
269    pub fn new(
270        partition_id: usize,
271        schema: SchemaRef,
272        expr: LexOrdering,
273        batch_size: usize,
274        sort_spill_reservation_bytes: usize,
275        sort_in_place_threshold_bytes: usize,
276        // Configured via `datafusion.execution.spill_compression`.
277        spill_compression: SpillCompression,
278        metrics: &ExecutionPlanMetricsSet,
279        runtime: Arc<RuntimeEnv>,
280    ) -> Result<Self> {
281        let metrics = ExternalSorterMetrics::new(metrics, partition_id);
282        let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
283            .with_can_spill(true)
284            .register(&runtime.memory_pool);
285
286        let merge_reservation =
287            MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
288                .register(&runtime.memory_pool);
289
290        let spill_manager = SpillManager::new(
291            Arc::clone(&runtime),
292            metrics.spill_metrics.clone(),
293            Arc::clone(&schema),
294        )
295        .with_compression_type(spill_compression);
296
297        Ok(Self {
298            schema,
299            in_mem_batches: vec![],
300            in_progress_spill_file: None,
301            finished_spill_files: vec![],
302            expr,
303            metrics,
304            reservation,
305            spill_manager,
306            merge_reservation,
307            runtime,
308            batch_size,
309            sort_spill_reservation_bytes,
310            sort_in_place_threshold_bytes,
311        })
312    }
313
314    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
315    ///
316    /// Updates memory usage metrics, and possibly triggers spilling to disk
317    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
318        if input.num_rows() == 0 {
319            return Ok(());
320        }
321
322        self.reserve_memory_for_merge()?;
323        self.reserve_memory_for_batch_and_maybe_spill(&input)
324            .await?;
325
326        self.in_mem_batches.push(input);
327        Ok(())
328    }
329
330    fn spilled_before(&self) -> bool {
331        !self.finished_spill_files.is_empty()
332    }
333
334    /// Returns the final sorted output of all batches inserted via
335    /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
336    ///
337    /// This process could either be:
338    ///
339    /// 1. An in-memory sort/merge (if the input fit in memory)
340    ///
341    /// 2. A combined streaming merge incorporating both in-memory
342    ///    batches and data from spill files on disk.
343    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
344        // Release the memory reserved for merge back to the pool so
345        // there is some left when `in_mem_sort_stream` requests an
346        // allocation.
347        self.merge_reservation.free();
348
349        if self.spilled_before() {
350            // Sort `in_mem_batches` and spill it first. If there are many
351            // `in_mem_batches` and the memory limit is almost reached, merging
352            // them with the spilled files at the same time might cause OOM.
353            if !self.in_mem_batches.is_empty() {
354                self.sort_and_spill_in_mem_batches().await?;
355            }
356
357            StreamingMergeBuilder::new()
358                .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files))
359                .with_spill_manager(self.spill_manager.clone())
360                .with_schema(Arc::clone(&self.schema))
361                .with_expressions(&self.expr.clone())
362                .with_metrics(self.metrics.baseline.clone())
363                .with_batch_size(self.batch_size)
364                .with_fetch(None)
365                .with_reservation(self.merge_reservation.new_empty())
366                .build()
367        } else {
368            self.in_mem_sort_stream(self.metrics.baseline.clone())
369        }
370    }
371
372    /// How much memory is buffered in this `ExternalSorter`?
373    fn used(&self) -> usize {
374        self.reservation.size()
375    }
376
377    /// How many bytes have been spilled to disk?
378    fn spilled_bytes(&self) -> usize {
379        self.metrics.spill_metrics.spilled_bytes.value()
380    }
381
382    /// How many rows have been spilled to disk?
383    fn spilled_rows(&self) -> usize {
384        self.metrics.spill_metrics.spilled_rows.value()
385    }
386
387    /// How many spill files have been created?
388    fn spill_count(&self) -> usize {
389        self.metrics.spill_metrics.spill_file_count.value()
390    }
391
392    /// Appending globally sorted batches to the in-progress spill file, and clears
393    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
394    async fn consume_and_spill_append(
395        &mut self,
396        globally_sorted_batches: &mut Vec<RecordBatch>,
397    ) -> Result<()> {
398        if globally_sorted_batches.is_empty() {
399            return Ok(());
400        }
401
402        // Lazily initialize the in-progress spill file
403        if self.in_progress_spill_file.is_none() {
404            self.in_progress_spill_file =
405                Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
406        }
407
408        Self::organize_stringview_arrays(globally_sorted_batches)?;
409
410        debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
411
412        let batches_to_spill = std::mem::take(globally_sorted_batches);
413        self.reservation.free();
414
415        let (in_progress_file, max_record_batch_size) =
416            self.in_progress_spill_file.as_mut().ok_or_else(|| {
417                internal_datafusion_err!("In-progress spill file should be initialized")
418            })?;
419
420        for batch in batches_to_spill {
421            in_progress_file.append_batch(&batch)?;
422
423            *max_record_batch_size =
424                (*max_record_batch_size).max(batch.get_sliced_size()?);
425        }
426
427        assert_or_internal_err!(
428            globally_sorted_batches.is_empty(),
429            "This function consumes globally_sorted_batches, so it should be empty after taking."
430        );
431
432        Ok(())
433    }
434
435    /// Finishes the in-progress spill file and moves it to the finished spill files.
436    async fn spill_finish(&mut self) -> Result<()> {
437        let (mut in_progress_file, max_record_batch_memory) =
438            self.in_progress_spill_file.take().ok_or_else(|| {
439                internal_datafusion_err!("Should be called after `spill_append`")
440            })?;
441        let spill_file = in_progress_file.finish()?;
442
443        if let Some(spill_file) = spill_file {
444            self.finished_spill_files.push(SortedSpillFile {
445                file: spill_file,
446                max_record_batch_memory,
447            });
448        }
449
450        Ok(())
451    }
452
453    /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
454    /// `StringViewArray` in sequential order by calling `gc()` on them.
455    ///
456    /// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
457    /// available
458    ///
459    /// # Rationale
460    /// After (merge-based) sorting, all batches will be sorted into a single run,
461    /// but physically this sorted run is chunked into many small batches. For
462    /// `StringViewArray`s inside each sorted run, their inner buffers are not
463    /// re-constructed by default, leading to non-sequential payload locations
464    /// (permutated by `interleave()` Arrow kernel). A single payload buffer might
465    /// be shared by multiple `RecordBatch`es.
466    /// When writing each batch to disk, the writer has to write all referenced buffers,
467    /// because they have to be read back one by one to reduce memory usage. This
468    /// causes extra disk reads and writes, and potentially execution failure.
469    ///
470    /// # Example
471    /// Before sorting:
472    /// batch1 -> buffer1
473    /// batch2 -> buffer2
474    ///
475    /// sorted_batch1 -> buffer1
476    ///               -> buffer2
477    /// sorted_batch2 -> buffer1
478    ///               -> buffer2
479    ///
480    /// Then when spilling each batch, the writer has to write all referenced buffers
481    /// repeatedly.
482    fn organize_stringview_arrays(
483        globally_sorted_batches: &mut Vec<RecordBatch>,
484    ) -> Result<()> {
485        let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
486
487        for batch in globally_sorted_batches.drain(..) {
488            let mut new_columns: Vec<Arc<dyn Array>> =
489                Vec::with_capacity(batch.num_columns());
490
491            let mut arr_mutated = false;
492            for array in batch.columns() {
493                if let Some(string_view_array) =
494                    array.as_any().downcast_ref::<StringViewArray>()
495                {
496                    let new_array = string_view_array.gc();
497                    new_columns.push(Arc::new(new_array));
498                    arr_mutated = true;
499                } else {
500                    new_columns.push(Arc::clone(array));
501                }
502            }
503
504            let organized_batch = if arr_mutated {
505                RecordBatch::try_new(batch.schema(), new_columns)?
506            } else {
507                batch
508            };
509
510            organized_batches.push(organized_batch);
511        }
512
513        *globally_sorted_batches = organized_batches;
514
515        Ok(())
516    }
517
518    /// Sorts the in-memory batches and merges them into a single sorted run, then writes
519    /// the result to spill files.
520    async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
521        assert_or_internal_err!(
522            !self.in_mem_batches.is_empty(),
523            "in_mem_batches must not be empty when attempting to sort and spill"
524        );
525
526        // Release the memory reserved for merge back to the pool so
527        // there is some left when `in_mem_sort_stream` requests an
528        // allocation. At the end of this function, memory will be
529        // reserved again for the next spill.
530        self.merge_reservation.free();
531
532        let mut sorted_stream =
533            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
534        // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
535        // to construct a globally sorted stream.
536        assert_or_internal_err!(
537            self.in_mem_batches.is_empty(),
538            "in_mem_batches should be empty after constructing sorted stream"
539        );
540        // 'global' here refers to all buffered batches when the memory limit is
541        // reached. This variable will buffer the sorted batches after
542        // sort-preserving merge and incrementally append to spill files.
543        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
544
545        while let Some(batch) = sorted_stream.next().await {
546            let batch = batch?;
547            let sorted_size = get_reserved_bytes_for_record_batch(&batch)?;
548            if self.reservation.try_grow(sorted_size).is_err() {
549                // Although the reservation is not enough, the batch is
550                // already in memory, so it's okay to combine it with previously
551                // sorted batches, and spill together.
552                globally_sorted_batches.push(batch);
553                self.consume_and_spill_append(&mut globally_sorted_batches)
554                    .await?; // reservation is freed in spill()
555            } else {
556                globally_sorted_batches.push(batch);
557            }
558        }
559
560        // Drop early to free up memory reserved by the sorted stream, otherwise the
561        // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
562        drop(sorted_stream);
563
564        self.consume_and_spill_append(&mut globally_sorted_batches)
565            .await?;
566        self.spill_finish().await?;
567
568        // Sanity check after spilling
569        let buffers_cleared_property =
570            self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
571        assert_or_internal_err!(
572            buffers_cleared_property,
573            "in_mem_batches and globally_sorted_batches should be cleared before"
574        );
575
576        // Reserve headroom for next sort/merge
577        self.reserve_memory_for_merge()?;
578
579        Ok(())
580    }
581
582    /// Consumes in_mem_batches returning a sorted stream of
583    /// batches. This proceeds in one of two ways:
584    ///
585    /// # Small Datasets
586    ///
587    /// For "smaller" datasets, the data is first concatenated into a
588    /// single batch and then sorted. This is often faster than
589    /// sorting and then merging.
590    ///
591    /// ```text
592    ///        ┌─────┐
593    ///        │  2  │
594    ///        │  3  │
595    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
596    ///        │  4  │                     │  2  │
597    ///        │  2  │        │            │  3  │
598    ///        └─────┘                     │  1  │             sorted output
599    ///        ┌─────┐        ▼            │  4  │                stream
600    ///        │  1  │                     │  2  │
601    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
602    ///        │  1  │                     │  4  │
603    ///        └─────┘        ▲            │  1  │
604    ///          ...          │            │ ... │
605    ///                                    │  4  │
606    ///        ┌─────┐        │            │  3  │
607    ///        │  4  │                     └─────┘
608    ///        │  3  │─ ─ ─ ─ ┘
609    ///        └─────┘
610    ///     in_mem_batches
611    /// ```
612    ///
613    /// # Larger datasets
614    ///
615    /// For larger datasets, the batches are first sorted individually
616    /// and then merged together.
617    ///
618    /// ```text
619    ///      ┌─────┐                ┌─────┐
620    ///      │  2  │                │  1  │
621    ///      │  3  │                │  2  │
622    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
623    ///      │  4  │                │  3  │
624    ///      │  2  │                │  4  │          │
625    ///      └─────┘                └─────┘               sorted output
626    ///      ┌─────┐                ┌─────┐          ▼       stream
627    ///      │  1  │                │  1  │
628    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
629    ///      │  1  │                │  4  │
630    ///      └─────┘                └─────┘          ▲
631    ///        ...       ...         ...             │
632    ///
633    ///      ┌─────┐                ┌─────┐          │
634    ///      │  4  │                │  3  │
635    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
636    ///      └─────┘                └─────┘
637    ///
638    ///   in_mem_batches
639    /// ```
640    fn in_mem_sort_stream(
641        &mut self,
642        metrics: BaselineMetrics,
643    ) -> Result<SendableRecordBatchStream> {
644        if self.in_mem_batches.is_empty() {
645            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
646                &self.schema,
647            ))));
648        }
649
650        // The elapsed compute timer is updated when the value is dropped.
651        // There is no need for an explicit call to drop.
652        let elapsed_compute = metrics.elapsed_compute().clone();
653        let _timer = elapsed_compute.timer();
654
655        // Please pay attention that any operation inside of `in_mem_sort_stream` will
656        // not perform any memory reservation. This is for avoiding the need of handling
657        // reservation failure and spilling in the middle of the sort/merge. The memory
658        // space for batches produced by the resulting stream will be reserved by the
659        // consumer of the stream.
660
661        if self.in_mem_batches.len() == 1 {
662            let batch = self.in_mem_batches.swap_remove(0);
663            let reservation = self.reservation.take();
664            return self.sort_batch_stream(batch, &metrics, reservation);
665        }
666
667        // If less than sort_in_place_threshold_bytes, concatenate and sort in place
668        if self.reservation.size() < self.sort_in_place_threshold_bytes {
669            // Concatenate memory batches together and sort
670            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
671            self.in_mem_batches.clear();
672            self.reservation
673                .try_resize(get_reserved_bytes_for_record_batch(&batch)?)
674                .map_err(Self::err_with_oom_context)?;
675            let reservation = self.reservation.take();
676            return self.sort_batch_stream(batch, &metrics, reservation);
677        }
678
679        let streams = std::mem::take(&mut self.in_mem_batches)
680            .into_iter()
681            .map(|batch| {
682                let metrics = self.metrics.baseline.intermediate();
683                let reservation = self
684                    .reservation
685                    .split(get_reserved_bytes_for_record_batch(&batch)?);
686                let input = self.sort_batch_stream(batch, &metrics, reservation)?;
687                Ok(spawn_buffered(input, 1))
688            })
689            .collect::<Result<_>>()?;
690
691        StreamingMergeBuilder::new()
692            .with_streams(streams)
693            .with_schema(Arc::clone(&self.schema))
694            .with_expressions(&self.expr.clone())
695            .with_metrics(metrics)
696            .with_batch_size(self.batch_size)
697            .with_fetch(None)
698            .with_reservation(self.merge_reservation.new_empty())
699            .build()
700    }
701
702    /// Sorts a single `RecordBatch` into a single stream.
703    ///
704    /// This may output multiple batches depending on the size of the
705    /// sorted data and the target batch size.
706    /// For single-batch output cases, `reservation` will be freed immediately after sorting,
707    /// as the batch will be output and is expected to be reserved by the consumer of the stream.
708    /// For multi-batch output cases, `reservation` will be grown to match the actual
709    /// size of sorted output, and as each batch is output, its memory will be freed from the reservation.
710    /// (This leads to the same behaviour, as futures are only evaluated when polled by the consumer.)
711    fn sort_batch_stream(
712        &self,
713        batch: RecordBatch,
714        metrics: &BaselineMetrics,
715        reservation: MemoryReservation,
716    ) -> Result<SendableRecordBatchStream> {
717        assert_eq!(
718            get_reserved_bytes_for_record_batch(&batch)?,
719            reservation.size()
720        );
721
722        let schema = batch.schema();
723        let expressions = self.expr.clone();
724        let batch_size = self.batch_size;
725        let output_row_metrics = metrics.output_rows().clone();
726
727        let stream = futures::stream::once(async move {
728            let schema = batch.schema();
729
730            // Sort the batch immediately and get all output batches
731            let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
732
733            // Resize the reservation to match the actual sorted output size.
734            // Using try_resize avoids a release-then-reacquire cycle, which
735            // matters for MemoryPool implementations where grow/shrink have
736            // non-trivial cost (e.g. JNI calls in Comet).
737            let total_sorted_size: usize = sorted_batches
738                .iter()
739                .map(get_record_batch_memory_size)
740                .sum();
741            reservation
742                .try_resize(total_sorted_size)
743                .map_err(Self::err_with_oom_context)?;
744
745            // Wrap in ReservationStream to hold the reservation
746            Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
747                Arc::clone(&schema),
748                Box::pin(RecordBatchStreamAdapter::new(
749                    Arc::clone(&schema),
750                    futures::stream::iter(sorted_batches.into_iter().map(Ok)),
751                )),
752                reservation,
753            )) as SendableRecordBatchStream)
754        })
755        .try_flatten()
756        .map(move |batch| match batch {
757            Ok(batch) => {
758                output_row_metrics.add(batch.num_rows());
759                Ok(batch)
760            }
761            Err(e) => Err(e),
762        });
763
764        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
765    }
766
767    /// If this sort may spill, pre-allocates
768    /// `sort_spill_reservation_bytes` of memory to guarantee memory
769    /// left for the in memory sort/merge.
770    fn reserve_memory_for_merge(&mut self) -> Result<()> {
771        // Reserve headroom for next merge sort
772        if self.runtime.disk_manager.tmp_files_enabled() {
773            let size = self.sort_spill_reservation_bytes;
774            if self.merge_reservation.size() != size {
775                self.merge_reservation
776                    .try_resize(size)
777                    .map_err(Self::err_with_oom_context)?;
778            }
779        }
780
781        Ok(())
782    }
783
784    /// Reserves memory to be able to accommodate the given batch.
785    /// If memory is scarce, tries to spill current in-memory batches to disk first.
786    async fn reserve_memory_for_batch_and_maybe_spill(
787        &mut self,
788        input: &RecordBatch,
789    ) -> Result<()> {
790        let size = get_reserved_bytes_for_record_batch(input)?;
791
792        match self.reservation.try_grow(size) {
793            Ok(_) => Ok(()),
794            Err(e) => {
795                if self.in_mem_batches.is_empty() {
796                    return Err(Self::err_with_oom_context(e));
797                }
798
799                // Spill and try again.
800                self.sort_and_spill_in_mem_batches().await?;
801                self.reservation
802                    .try_grow(size)
803                    .map_err(Self::err_with_oom_context)
804            }
805        }
806    }
807
808    /// Wraps the error with a context message suggesting settings to tweak.
809    /// This is meant to be used with DataFusionError::ResourcesExhausted only.
810    fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
811        match e {
812            DataFusionError::ResourcesExhausted(_) => e.context(
813                "Not enough memory to continue external sort. \
814                    Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', \
815                    or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'."
816            ),
817            // This is not an OOM error, so just return it as is.
818            _ => e,
819        }
820    }
821}
822
823/// Estimate how much memory is needed to sort a `RecordBatch`.
824///
825/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
826/// creating sorted copies of sorted columns in record batches for speeding up comparison
827/// in sorting and merging. The sorted copies are in either row format or array format.
828/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
829/// sorted copies are, they will use more memory than the original record batch.
830///
831/// This can basically be calculated as the sum of the actual space it takes in
832/// memory (which would be larger for a sliced batch), and the size of the actual data.
833pub(crate) fn get_reserved_bytes_for_record_batch_size(
834    record_batch_size: usize,
835    sliced_size: usize,
836) -> usize {
837    // Even 2x may not be enough for some cases, but it's a good enough estimation as a baseline.
838    // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
839    // to compensate for the extra memory needed.
840    record_batch_size + sliced_size
841}
842
843/// Estimate how much memory is needed to sort a `RecordBatch`.
844/// This will just call `get_reserved_bytes_for_record_batch_size` with the
845/// memory size of the record batch and its sliced size.
846pub(crate) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result<usize> {
847    batch.get_sliced_size().map(|sliced_size| {
848        get_reserved_bytes_for_record_batch_size(
849            get_record_batch_memory_size(batch),
850            sliced_size,
851        )
852    })
853}
854
855impl Debug for ExternalSorter {
856    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
857        f.debug_struct("ExternalSorter")
858            .field("memory_used", &self.used())
859            .field("spilled_bytes", &self.spilled_bytes())
860            .field("spilled_rows", &self.spilled_rows())
861            .field("spill_count", &self.spill_count())
862            .finish()
863    }
864}
865
866pub fn sort_batch(
867    batch: &RecordBatch,
868    expressions: &LexOrdering,
869    fetch: Option<usize>,
870) -> Result<RecordBatch> {
871    let sort_columns = expressions
872        .iter()
873        .map(|expr| expr.evaluate_to_sort_column(batch))
874        .collect::<Result<Vec<_>>>()?;
875
876    let indices = lexsort_to_indices(&sort_columns, fetch)?;
877    let columns = take_arrays(batch.columns(), &indices, None)?;
878
879    let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
880    Ok(RecordBatch::try_new_with_options(
881        batch.schema(),
882        columns,
883        &options,
884    )?)
885}
886
887/// Sort a batch and return the result as multiple batches of size `batch_size`.
888/// This is useful when you want to avoid creating one large sorted batch in memory,
889/// and instead want to process the sorted data in smaller chunks.
890pub fn sort_batch_chunked(
891    batch: &RecordBatch,
892    expressions: &LexOrdering,
893    batch_size: usize,
894) -> Result<Vec<RecordBatch>> {
895    IncrementalSortIterator::new(batch.clone(), expressions.clone(), batch_size).collect()
896}
897
898/// Sort execution plan.
899///
900/// Support sorting datasets that are larger than the memory allotted
901/// by the memory manager, by spilling to disk.
902#[derive(Debug, Clone)]
903pub struct SortExec {
904    /// Input schema
905    pub(crate) input: Arc<dyn ExecutionPlan>,
906    /// Sort expressions
907    expr: LexOrdering,
908    /// Containing all metrics set created during sort
909    metrics_set: ExecutionPlanMetricsSet,
910    /// Preserve partitions of input plan. If false, the input partitions
911    /// will be sorted and merged into a single output partition.
912    preserve_partitioning: bool,
913    /// Fetch highest/lowest n results
914    fetch: Option<usize>,
915    /// Normalized common sort prefix between the input and the sort expressions (only used with fetch)
916    common_sort_prefix: Vec<PhysicalSortExpr>,
917    /// Cache holding plan properties like equivalences, output partitioning etc.
918    cache: Arc<PlanProperties>,
919    /// Filter matching the state of the sort for dynamic filter pushdown.
920    /// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
921    /// If `fetch` is `None`, this will be `None`.
922    filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
923}
924
925impl SortExec {
926    /// Create a new sort execution plan that produces a single,
927    /// sorted output partition.
928    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
929        let preserve_partitioning = false;
930        let (cache, sort_prefix) =
931            Self::compute_properties(&input, expr.clone(), preserve_partitioning)
932                .unwrap();
933        Self {
934            expr,
935            input,
936            metrics_set: ExecutionPlanMetricsSet::new(),
937            preserve_partitioning,
938            fetch: None,
939            common_sort_prefix: sort_prefix,
940            cache: Arc::new(cache),
941            filter: None,
942        }
943    }
944
945    /// Whether this `SortExec` preserves partitioning of the children
946    pub fn preserve_partitioning(&self) -> bool {
947        self.preserve_partitioning
948    }
949
950    /// Specify the partitioning behavior of this sort exec
951    ///
952    /// If `preserve_partitioning` is true, sorts each partition
953    /// individually, producing one sorted stream for each input partition.
954    ///
955    /// If `preserve_partitioning` is false, sorts and merges all
956    /// input partitions producing a single, sorted partition.
957    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
958        self.preserve_partitioning = preserve_partitioning;
959        Arc::make_mut(&mut self.cache).partitioning =
960            Self::output_partitioning_helper(&self.input, self.preserve_partitioning);
961        self
962    }
963
964    /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
965    fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
966        let children = self
967            .expr
968            .iter()
969            .map(|sort_expr| Arc::clone(&sort_expr.expr))
970            .collect::<Vec<_>>();
971        Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
972            DynamicFilterPhysicalExpr::new(children, lit(true)),
973        ))))
974    }
975
976    fn cloned(&self) -> Self {
977        SortExec {
978            input: Arc::clone(&self.input),
979            expr: self.expr.clone(),
980            metrics_set: self.metrics_set.clone(),
981            preserve_partitioning: self.preserve_partitioning,
982            common_sort_prefix: self.common_sort_prefix.clone(),
983            fetch: self.fetch,
984            cache: Arc::clone(&self.cache),
985            filter: self.filter.clone(),
986        }
987    }
988
989    /// Modify how many rows to include in the result
990    ///
991    /// If None, then all rows will be returned, in sorted order.
992    /// If Some, then only the top `fetch` rows will be returned.
993    /// This can reduce the memory pressure required by the sort
994    /// operation since rows that are not going to be included
995    /// can be dropped.
996    pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
997        let mut cache = PlanProperties::clone(&self.cache);
998        // If the SortExec can emit incrementally (that means the sort requirements
999        // and properties of the input match), the SortExec can generate its result
1000        // without scanning the entire input when a fetch value exists.
1001        let is_pipeline_friendly = matches!(
1002            cache.emission_type,
1003            EmissionType::Incremental | EmissionType::Both
1004        );
1005        if fetch.is_some() && is_pipeline_friendly {
1006            cache = cache.with_boundedness(Boundedness::Bounded);
1007        }
1008        let filter = fetch.is_some().then(|| {
1009            // If we already have a filter, keep it. Otherwise, create a new one.
1010            self.filter.clone().unwrap_or_else(|| self.create_filter())
1011        });
1012        let mut new_sort = self.cloned();
1013        new_sort.fetch = fetch;
1014        new_sort.cache = cache.into();
1015        new_sort.filter = filter;
1016        new_sort
1017    }
1018
1019    /// Input schema
1020    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
1021        &self.input
1022    }
1023
1024    /// Sort expressions
1025    pub fn expr(&self) -> &LexOrdering {
1026        &self.expr
1027    }
1028
1029    /// If `Some(fetch)`, limits output to only the first "fetch" items
1030    pub fn fetch(&self) -> Option<usize> {
1031        self.fetch
1032    }
1033
1034    fn output_partitioning_helper(
1035        input: &Arc<dyn ExecutionPlan>,
1036        preserve_partitioning: bool,
1037    ) -> Partitioning {
1038        // Get output partitioning:
1039        if preserve_partitioning {
1040            input.output_partitioning().clone()
1041        } else {
1042            Partitioning::UnknownPartitioning(1)
1043        }
1044    }
1045
1046    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
1047    /// It also returns the common sort prefix between the input and the sort expressions.
1048    fn compute_properties(
1049        input: &Arc<dyn ExecutionPlan>,
1050        sort_exprs: LexOrdering,
1051        preserve_partitioning: bool,
1052    ) -> Result<(PlanProperties, Vec<PhysicalSortExpr>)> {
1053        let (sort_prefix, sort_satisfied) = input
1054            .equivalence_properties()
1055            .extract_common_sort_prefix(sort_exprs.clone())?;
1056
1057        // The emission type depends on whether the input is already sorted:
1058        // - If already fully sorted, we can emit results in the same way as the input
1059        // - If not sorted, we must wait until all data is processed to emit results (Final)
1060        let emission_type = if sort_satisfied {
1061            input.pipeline_behavior()
1062        } else {
1063            EmissionType::Final
1064        };
1065
1066        // The boundedness depends on whether the input is already sorted:
1067        // - If already sorted, we have the same property as the input
1068        // - If not sorted and input is unbounded, we require infinite memory and generates
1069        //   unbounded data (not practical).
1070        // - If not sorted and input is bounded, then the SortExec is bounded, too.
1071        let boundedness = if sort_satisfied {
1072            input.boundedness()
1073        } else {
1074            match input.boundedness() {
1075                Boundedness::Unbounded { .. } => Boundedness::Unbounded {
1076                    requires_infinite_memory: true,
1077                },
1078                bounded => bounded,
1079            }
1080        };
1081
1082        // Calculate equivalence properties; i.e. reset the ordering equivalence
1083        // class with the new ordering:
1084        let mut eq_properties = input.equivalence_properties().clone();
1085        eq_properties.reorder(sort_exprs)?;
1086
1087        // Get output partitioning:
1088        let output_partitioning =
1089            Self::output_partitioning_helper(input, preserve_partitioning);
1090
1091        Ok((
1092            PlanProperties::new(
1093                eq_properties,
1094                output_partitioning,
1095                emission_type,
1096                boundedness,
1097            ),
1098            sort_prefix,
1099        ))
1100    }
1101}
1102
1103impl DisplayAs for SortExec {
1104    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1105        match t {
1106            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1107                let preserve_partitioning = self.preserve_partitioning;
1108                match self.fetch {
1109                    Some(fetch) => {
1110                        write!(
1111                            f,
1112                            "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1113                            self.expr
1114                        )?;
1115                        if let Some(filter) = &self.filter
1116                            && let Ok(current) = filter.read().expr().current()
1117                            && !current.eq(&lit(true))
1118                        {
1119                            write!(f, ", filter=[{current}]")?;
1120                        }
1121                        if !self.common_sort_prefix.is_empty() {
1122                            write!(f, ", sort_prefix=[")?;
1123                            let mut first = true;
1124                            for sort_expr in &self.common_sort_prefix {
1125                                if first {
1126                                    first = false;
1127                                } else {
1128                                    write!(f, ", ")?;
1129                                }
1130                                write!(f, "{sort_expr}")?;
1131                            }
1132                            write!(f, "]")
1133                        } else {
1134                            Ok(())
1135                        }
1136                    }
1137                    None => write!(
1138                        f,
1139                        "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]",
1140                        self.expr
1141                    ),
1142                }
1143            }
1144            DisplayFormatType::TreeRender => match self.fetch {
1145                Some(fetch) => {
1146                    writeln!(f, "{}", self.expr)?;
1147                    writeln!(f, "limit={fetch}")
1148                }
1149                None => {
1150                    writeln!(f, "{}", self.expr)
1151                }
1152            },
1153        }
1154    }
1155}
1156
1157impl ExecutionPlan for SortExec {
1158    fn name(&self) -> &'static str {
1159        match self.fetch {
1160            Some(_) => "SortExec(TopK)",
1161            None => "SortExec",
1162        }
1163    }
1164
1165    fn as_any(&self) -> &dyn Any {
1166        self
1167    }
1168
1169    fn properties(&self) -> &Arc<PlanProperties> {
1170        &self.cache
1171    }
1172
1173    fn required_input_distribution(&self) -> Vec<Distribution> {
1174        if self.preserve_partitioning {
1175            vec![Distribution::UnspecifiedDistribution]
1176        } else {
1177            // global sort
1178            // TODO support RangePartition and OrderedDistribution
1179            vec![Distribution::SinglePartition]
1180        }
1181    }
1182
1183    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1184        vec![&self.input]
1185    }
1186
1187    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1188        vec![false]
1189    }
1190
1191    fn with_new_children(
1192        self: Arc<Self>,
1193        children: Vec<Arc<dyn ExecutionPlan>>,
1194    ) -> Result<Arc<dyn ExecutionPlan>> {
1195        let mut new_sort = self.cloned();
1196        assert_eq!(children.len(), 1, "SortExec should have exactly one child");
1197        new_sort.input = Arc::clone(&children[0]);
1198
1199        if !has_same_children_properties(self.as_ref(), &children)? {
1200            // Recompute the properties based on the new input since they may have changed
1201            let (cache, sort_prefix) = Self::compute_properties(
1202                &new_sort.input,
1203                new_sort.expr.clone(),
1204                new_sort.preserve_partitioning,
1205            )?;
1206            new_sort.cache = Arc::new(cache);
1207            new_sort.common_sort_prefix = sort_prefix;
1208        }
1209
1210        Ok(Arc::new(new_sort))
1211    }
1212
1213    fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1214        let children = self.children().into_iter().cloned().collect();
1215        let new_sort = self.with_new_children(children)?;
1216        let mut new_sort = new_sort
1217            .as_any()
1218            .downcast_ref::<SortExec>()
1219            .expect("cloned 1 lines above this line, we know the type")
1220            .clone();
1221        // Our dynamic filter and execution metrics are the state we need to reset.
1222        new_sort.filter = Some(new_sort.create_filter());
1223        new_sort.metrics_set = ExecutionPlanMetricsSet::new();
1224
1225        Ok(Arc::new(new_sort))
1226    }
1227
1228    fn execute(
1229        &self,
1230        partition: usize,
1231        context: Arc<TaskContext>,
1232    ) -> Result<SendableRecordBatchStream> {
1233        trace!(
1234            "Start SortExec::execute for partition {} of context session_id {} and task_id {:?}",
1235            partition,
1236            context.session_id(),
1237            context.task_id()
1238        );
1239
1240        let mut input = self.input.execute(partition, Arc::clone(&context))?;
1241
1242        let execution_options = &context.session_config().options().execution;
1243
1244        trace!("End SortExec's input.execute for partition: {partition}");
1245
1246        let sort_satisfied = self
1247            .input
1248            .equivalence_properties()
1249            .ordering_satisfy(self.expr.clone())?;
1250
1251        match (sort_satisfied, self.fetch.as_ref()) {
1252            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1253                input,
1254                0,
1255                Some(*fetch),
1256                BaselineMetrics::new(&self.metrics_set, partition),
1257            ))),
1258            (true, None) => Ok(input),
1259            (false, Some(fetch)) => {
1260                let filter = self.filter.clone();
1261                let mut topk = TopK::try_new(
1262                    partition,
1263                    input.schema(),
1264                    self.common_sort_prefix.clone(),
1265                    self.expr.clone(),
1266                    *fetch,
1267                    context.session_config().batch_size(),
1268                    context.runtime_env(),
1269                    &self.metrics_set,
1270                    Arc::clone(&unwrap_or_internal_err!(filter)),
1271                )?;
1272                Ok(Box::pin(RecordBatchStreamAdapter::new(
1273                    self.schema(),
1274                    futures::stream::once(async move {
1275                        while let Some(batch) = input.next().await {
1276                            let batch = batch?;
1277                            topk.insert_batch(batch)?;
1278                            if topk.finished {
1279                                break;
1280                            }
1281                        }
1282                        topk.emit()
1283                    })
1284                    .try_flatten(),
1285                )))
1286            }
1287            (false, None) => {
1288                let mut sorter = ExternalSorter::new(
1289                    partition,
1290                    input.schema(),
1291                    self.expr.clone(),
1292                    context.session_config().batch_size(),
1293                    execution_options.sort_spill_reservation_bytes,
1294                    execution_options.sort_in_place_threshold_bytes,
1295                    context.session_config().spill_compression(),
1296                    &self.metrics_set,
1297                    context.runtime_env(),
1298                )?;
1299                Ok(Box::pin(RecordBatchStreamAdapter::new(
1300                    self.schema(),
1301                    futures::stream::once(async move {
1302                        while let Some(batch) = input.next().await {
1303                            let batch = batch?;
1304                            sorter.insert_batch(batch).await?;
1305                        }
1306                        sorter.sort().await
1307                    })
1308                    .try_flatten(),
1309                )))
1310            }
1311        }
1312    }
1313
1314    fn metrics(&self) -> Option<MetricsSet> {
1315        Some(self.metrics_set.clone_inner())
1316    }
1317
1318    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1319        if !self.preserve_partitioning() {
1320            return self
1321                .input
1322                .partition_statistics(None)?
1323                .with_fetch(self.fetch, 0, 1);
1324        }
1325        self.input
1326            .partition_statistics(partition)?
1327            .with_fetch(self.fetch, 0, 1)
1328    }
1329
1330    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1331        Some(Arc::new(SortExec::with_fetch(self, limit)))
1332    }
1333
1334    fn fetch(&self) -> Option<usize> {
1335        self.fetch
1336    }
1337
1338    fn cardinality_effect(&self) -> CardinalityEffect {
1339        if self.fetch.is_none() {
1340            CardinalityEffect::Equal
1341        } else {
1342            CardinalityEffect::LowerEqual
1343        }
1344    }
1345
1346    /// Tries to swap the projection with its input [`SortExec`]. If it can be done,
1347    /// it returns the new swapped version having the [`SortExec`] as the top plan.
1348    /// Otherwise, it returns None.
1349    fn try_swapping_with_projection(
1350        &self,
1351        projection: &ProjectionExec,
1352    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1353        // If the projection does not narrow the schema, we should not try to push it down.
1354        if projection.expr().len() >= projection.input().schema().fields().len() {
1355            return Ok(None);
1356        }
1357
1358        let Some(updated_exprs) = update_ordering(self.expr.clone(), projection.expr())?
1359        else {
1360            return Ok(None);
1361        };
1362
1363        Ok(Some(Arc::new(
1364            SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1365                .with_fetch(self.fetch())
1366                .with_preserve_partitioning(self.preserve_partitioning()),
1367        )))
1368    }
1369
1370    fn gather_filters_for_pushdown(
1371        &self,
1372        phase: FilterPushdownPhase,
1373        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1374        config: &datafusion_common::config::ConfigOptions,
1375    ) -> Result<FilterDescription> {
1376        if phase != FilterPushdownPhase::Post {
1377            return FilterDescription::from_children(parent_filters, &self.children());
1378        }
1379
1380        let mut child =
1381            ChildFilterDescription::from_child(&parent_filters, self.input())?;
1382
1383        if let Some(filter) = &self.filter
1384            && config.optimizer.enable_topk_dynamic_filter_pushdown
1385        {
1386            child = child.with_self_filter(filter.read().expr());
1387        }
1388
1389        Ok(FilterDescription::new().with_child(child))
1390    }
1391}
1392
1393#[cfg(test)]
1394mod tests {
1395    use std::collections::HashMap;
1396    use std::pin::Pin;
1397    use std::task::{Context, Poll};
1398
1399    use super::*;
1400    use crate::coalesce_partitions::CoalescePartitionsExec;
1401    use crate::collect;
1402    use crate::execution_plan::Boundedness;
1403    use crate::expressions::col;
1404    use crate::test;
1405    use crate::test::TestMemoryExec;
1406    use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
1407    use crate::test::{assert_is_pending, make_partition};
1408
1409    use arrow::array::*;
1410    use arrow::compute::SortOptions;
1411    use arrow::datatypes::*;
1412    use datafusion_common::cast::as_primitive_array;
1413    use datafusion_common::test_util::batches_to_string;
1414    use datafusion_common::{DataFusionError, Result, ScalarValue};
1415    use datafusion_execution::RecordBatchStream;
1416    use datafusion_execution::config::SessionConfig;
1417    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1418    use datafusion_physical_expr::EquivalenceProperties;
1419    use datafusion_physical_expr::expressions::{Column, Literal};
1420
1421    use futures::{FutureExt, Stream};
1422    use insta::assert_snapshot;
1423
1424    #[derive(Debug, Clone)]
1425    pub struct SortedUnboundedExec {
1426        schema: Schema,
1427        batch_size: u64,
1428        cache: Arc<PlanProperties>,
1429    }
1430
1431    impl DisplayAs for SortedUnboundedExec {
1432        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1433            match t {
1434                DisplayFormatType::Default
1435                | DisplayFormatType::Verbose
1436                | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1437            }
1438            Ok(())
1439        }
1440    }
1441
1442    impl SortedUnboundedExec {
1443        fn compute_properties(schema: SchemaRef) -> PlanProperties {
1444            let mut eq_properties = EquivalenceProperties::new(schema);
1445            eq_properties.add_ordering([PhysicalSortExpr::new_default(Arc::new(
1446                Column::new("c1", 0),
1447            ))]);
1448            PlanProperties::new(
1449                eq_properties,
1450                Partitioning::UnknownPartitioning(1),
1451                EmissionType::Final,
1452                Boundedness::Unbounded {
1453                    requires_infinite_memory: false,
1454                },
1455            )
1456        }
1457    }
1458
1459    impl ExecutionPlan for SortedUnboundedExec {
1460        fn name(&self) -> &'static str {
1461            Self::static_name()
1462        }
1463
1464        fn as_any(&self) -> &dyn Any {
1465            self
1466        }
1467
1468        fn properties(&self) -> &Arc<PlanProperties> {
1469            &self.cache
1470        }
1471
1472        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1473            vec![]
1474        }
1475
1476        fn with_new_children(
1477            self: Arc<Self>,
1478            _: Vec<Arc<dyn ExecutionPlan>>,
1479        ) -> Result<Arc<dyn ExecutionPlan>> {
1480            Ok(self)
1481        }
1482
1483        fn execute(
1484            &self,
1485            _partition: usize,
1486            _context: Arc<TaskContext>,
1487        ) -> Result<SendableRecordBatchStream> {
1488            Ok(Box::pin(SortedUnboundedStream {
1489                schema: Arc::new(self.schema.clone()),
1490                batch_size: self.batch_size,
1491                offset: 0,
1492            }))
1493        }
1494    }
1495
1496    #[derive(Debug)]
1497    pub struct SortedUnboundedStream {
1498        schema: SchemaRef,
1499        batch_size: u64,
1500        offset: u64,
1501    }
1502
1503    impl Stream for SortedUnboundedStream {
1504        type Item = Result<RecordBatch>;
1505
1506        fn poll_next(
1507            mut self: Pin<&mut Self>,
1508            _cx: &mut Context<'_>,
1509        ) -> Poll<Option<Self::Item>> {
1510            let batch = SortedUnboundedStream::create_record_batch(
1511                Arc::clone(&self.schema),
1512                self.offset,
1513                self.batch_size,
1514            );
1515            self.offset += self.batch_size;
1516            Poll::Ready(Some(Ok(batch)))
1517        }
1518    }
1519
1520    impl RecordBatchStream for SortedUnboundedStream {
1521        fn schema(&self) -> SchemaRef {
1522            Arc::clone(&self.schema)
1523        }
1524    }
1525
1526    impl SortedUnboundedStream {
1527        fn create_record_batch(
1528            schema: SchemaRef,
1529            offset: u64,
1530            batch_size: u64,
1531        ) -> RecordBatch {
1532            let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1533            let array = UInt64Array::from(values);
1534            let array_ref: ArrayRef = Arc::new(array);
1535            RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1536        }
1537    }
1538
1539    #[tokio::test]
1540    async fn test_in_mem_sort() -> Result<()> {
1541        let task_ctx = Arc::new(TaskContext::default());
1542        let partitions = 4;
1543        let csv = test::scan_partitioned(partitions);
1544        let schema = csv.schema();
1545
1546        let sort_exec = Arc::new(SortExec::new(
1547            [PhysicalSortExpr {
1548                expr: col("i", &schema)?,
1549                options: SortOptions::default(),
1550            }]
1551            .into(),
1552            Arc::new(CoalescePartitionsExec::new(csv)),
1553        ));
1554
1555        let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1556
1557        assert_eq!(result.len(), 1);
1558        assert_eq!(result[0].num_rows(), 400);
1559        assert_eq!(
1560            task_ctx.runtime_env().memory_pool.reserved(),
1561            0,
1562            "The sort should have returned all memory used back to the memory manager"
1563        );
1564
1565        Ok(())
1566    }
1567
1568    #[tokio::test]
1569    async fn test_sort_spill() -> Result<()> {
1570        // trigger spill w/ 100 batches
1571        let session_config = SessionConfig::new();
1572        let sort_spill_reservation_bytes = session_config
1573            .options()
1574            .execution
1575            .sort_spill_reservation_bytes;
1576        let runtime = RuntimeEnvBuilder::new()
1577            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1578            .build_arc()?;
1579        let task_ctx = Arc::new(
1580            TaskContext::default()
1581                .with_session_config(session_config)
1582                .with_runtime(runtime),
1583        );
1584
1585        // The input has 100 partitions, each partition has a batch containing 100 rows.
1586        // Each row has a single Int32 column with values 0..100. The total size of the
1587        // input is roughly 40000 bytes.
1588        let partitions = 100;
1589        let input = test::scan_partitioned(partitions);
1590        let schema = input.schema();
1591
1592        let sort_exec = Arc::new(SortExec::new(
1593            [PhysicalSortExpr {
1594                expr: col("i", &schema)?,
1595                options: SortOptions::default(),
1596            }]
1597            .into(),
1598            Arc::new(CoalescePartitionsExec::new(input)),
1599        ));
1600
1601        let result = collect(
1602            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1603            Arc::clone(&task_ctx),
1604        )
1605        .await?;
1606
1607        assert_eq!(result.len(), 2);
1608
1609        // Now, validate metrics
1610        let metrics = sort_exec.metrics().unwrap();
1611
1612        assert_eq!(metrics.output_rows().unwrap(), 10000);
1613        assert!(metrics.elapsed_compute().unwrap() > 0);
1614
1615        let spill_count = metrics.spill_count().unwrap();
1616        let spilled_rows = metrics.spilled_rows().unwrap();
1617        let spilled_bytes = metrics.spilled_bytes().unwrap();
1618        // Processing 40000 bytes of data using 12288 bytes of memory requires 3 spills
1619        // unless we do something really clever. It will spill roughly 9000+ rows and 36000
1620        // bytes. We leave a little wiggle room for the actual numbers.
1621        assert!((3..=10).contains(&spill_count));
1622        assert!((9000..=10000).contains(&spilled_rows));
1623        assert!((38000..=44000).contains(&spilled_bytes));
1624
1625        let columns = result[0].columns();
1626
1627        let i = as_primitive_array::<Int32Type>(&columns[0])?;
1628        assert_eq!(i.value(0), 0);
1629        assert_eq!(i.value(i.len() - 1), 81);
1630        assert_eq!(
1631            task_ctx.runtime_env().memory_pool.reserved(),
1632            0,
1633            "The sort should have returned all memory used back to the memory manager"
1634        );
1635
1636        Ok(())
1637    }
1638
1639    #[tokio::test]
1640    async fn test_batch_reservation_error() -> Result<()> {
1641        // Pick a memory limit and sort_spill_reservation that make the first batch reservation fail.
1642        let merge_reservation: usize = 0; // Set to 0 for simplicity
1643
1644        let session_config =
1645            SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1646
1647        let plan = test::scan_partitioned(1);
1648
1649        // Read the first record batch to determine the actual memory requirement
1650        let expected_batch_reservation = {
1651            let temp_ctx = Arc::new(TaskContext::default());
1652            let mut stream = plan.execute(0, Arc::clone(&temp_ctx))?;
1653            let first_batch = stream.next().await.unwrap()?;
1654            get_reserved_bytes_for_record_batch(&first_batch)?
1655        };
1656
1657        // Set memory limit just short of what we need
1658        let memory_limit: usize = expected_batch_reservation + merge_reservation - 1;
1659
1660        let runtime = RuntimeEnvBuilder::new()
1661            .with_memory_limit(memory_limit, 1.0)
1662            .build_arc()?;
1663        let task_ctx = Arc::new(
1664            TaskContext::default()
1665                .with_session_config(session_config)
1666                .with_runtime(runtime),
1667        );
1668
1669        // Verify that our memory limit is insufficient
1670        {
1671            let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1672            let first_batch = stream.next().await.unwrap()?;
1673            let batch_reservation = get_reserved_bytes_for_record_batch(&first_batch)?;
1674
1675            assert_eq!(batch_reservation, expected_batch_reservation);
1676            assert!(memory_limit < (merge_reservation + batch_reservation));
1677        }
1678
1679        let sort_exec = Arc::new(SortExec::new(
1680            [PhysicalSortExpr::new_default(col("i", &plan.schema())?)].into(),
1681            plan,
1682        ));
1683
1684        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await;
1685
1686        let err = result.unwrap_err();
1687        assert!(
1688            matches!(err, DataFusionError::Context(..)),
1689            "Assertion failed: expected a Context error, but got: {err:?}"
1690        );
1691
1692        // Assert that the context error is wrapping a resources exhausted error.
1693        assert!(
1694            matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1695            "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1696        );
1697
1698        // Verify external sorter error message when resource is exhausted
1699        let config_vector = vec![
1700            "datafusion.runtime.memory_limit",
1701            "datafusion.execution.sort_spill_reservation_bytes",
1702        ];
1703        let error_message = err.message().to_string();
1704        for config in config_vector.into_iter() {
1705            assert!(
1706                error_message.as_str().contains(config),
1707                "Config: '{}' should be contained in error message: {}.",
1708                config,
1709                error_message.as_str()
1710            );
1711        }
1712
1713        Ok(())
1714    }
1715
1716    #[tokio::test]
1717    async fn test_sort_spill_utf8_strings() -> Result<()> {
1718        let session_config = SessionConfig::new()
1719            .with_batch_size(100)
1720            .with_sort_in_place_threshold_bytes(20 * 1024)
1721            .with_sort_spill_reservation_bytes(100 * 1024);
1722        let runtime = RuntimeEnvBuilder::new()
1723            .with_memory_limit(500 * 1024, 1.0)
1724            .build_arc()?;
1725        let task_ctx = Arc::new(
1726            TaskContext::default()
1727                .with_session_config(session_config)
1728                .with_runtime(runtime),
1729        );
1730
1731        // The input has 200 partitions, each partition has a batch containing 100 rows.
1732        // Each row has a single Utf8 column, the Utf8 string values are roughly 42 bytes.
1733        // The total size of the input is roughly 820 KB.
1734        let input = test::scan_partitioned_utf8(200);
1735        let schema = input.schema();
1736
1737        let sort_exec = Arc::new(SortExec::new(
1738            [PhysicalSortExpr {
1739                expr: col("i", &schema)?,
1740                options: SortOptions::default(),
1741            }]
1742            .into(),
1743            Arc::new(CoalescePartitionsExec::new(input)),
1744        ));
1745
1746        let result = collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1747
1748        let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1749        assert_eq!(num_rows, 20000);
1750
1751        // Now, validate metrics
1752        let metrics = sort_exec.metrics().unwrap();
1753
1754        assert_eq!(metrics.output_rows().unwrap(), 20000);
1755        assert!(metrics.elapsed_compute().unwrap() > 0);
1756
1757        let spill_count = metrics.spill_count().unwrap();
1758        let spilled_rows = metrics.spilled_rows().unwrap();
1759        let spilled_bytes = metrics.spilled_bytes().unwrap();
1760
1761        // This test case is processing 840KB of data using 400KB of memory. Note
1762        // that buffered batches can't be dropped until all sorted batches are
1763        // generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
1764        // batches.
1765        // The number of spills is roughly calculated as:
1766        //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1767
1768        // If this assertion fail with large spill count, make sure the following
1769        // case does not happen:
1770        // During external sorting, one sorted run should be spilled to disk in a
1771        // single file, due to memory limit we might need to append to the file
1772        // multiple times to spill all the data. Make sure we're not writing each
1773        // appending as a separate file.
1774        assert!((4..=8).contains(&spill_count));
1775        assert!((15000..=20000).contains(&spilled_rows));
1776        assert!((900000..=1000000).contains(&spilled_bytes));
1777
1778        // Verify that the result is sorted
1779        let concated_result = concat_batches(&schema, &result)?;
1780        let columns = concated_result.columns();
1781        let string_array = as_string_array(&columns[0]);
1782        for i in 0..string_array.len() - 1 {
1783            assert!(string_array.value(i) <= string_array.value(i + 1));
1784        }
1785
1786        assert_eq!(
1787            task_ctx.runtime_env().memory_pool.reserved(),
1788            0,
1789            "The sort should have returned all memory used back to the memory manager"
1790        );
1791
1792        Ok(())
1793    }
1794
1795    #[tokio::test]
1796    async fn test_sort_fetch_memory_calculation() -> Result<()> {
1797        // This test mirrors down the size from the example above.
1798        let avg_batch_size = 400;
1799        let partitions = 4;
1800
1801        // A tuple of (fetch, expect_spillage)
1802        let test_options = vec![
1803            // Since we don't have a limit (and the memory is less than the total size of
1804            // all the batches we are processing, we expect it to spill.
1805            (None, true),
1806            // When we have a limit however, the buffered size of batches should fit in memory
1807            // since it is much lower than the total size of the input batch.
1808            (Some(1), false),
1809        ];
1810
1811        for (fetch, expect_spillage) in test_options {
1812            let session_config = SessionConfig::new();
1813            let sort_spill_reservation_bytes = session_config
1814                .options()
1815                .execution
1816                .sort_spill_reservation_bytes;
1817
1818            let runtime = RuntimeEnvBuilder::new()
1819                .with_memory_limit(
1820                    sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1821                    1.0,
1822                )
1823                .build_arc()?;
1824            let task_ctx = Arc::new(
1825                TaskContext::default()
1826                    .with_runtime(runtime)
1827                    .with_session_config(session_config),
1828            );
1829
1830            let csv = test::scan_partitioned(partitions);
1831            let schema = csv.schema();
1832
1833            let sort_exec = Arc::new(
1834                SortExec::new(
1835                    [PhysicalSortExpr {
1836                        expr: col("i", &schema)?,
1837                        options: SortOptions::default(),
1838                    }]
1839                    .into(),
1840                    Arc::new(CoalescePartitionsExec::new(csv)),
1841                )
1842                .with_fetch(fetch),
1843            );
1844
1845            let result =
1846                collect(Arc::clone(&sort_exec) as _, Arc::clone(&task_ctx)).await?;
1847            assert_eq!(result.len(), 1);
1848
1849            let metrics = sort_exec.metrics().unwrap();
1850            let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1851            assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1852        }
1853        Ok(())
1854    }
1855
1856    #[tokio::test]
1857    async fn test_sort_memory_reduction_per_batch() -> Result<()> {
1858        // This test verifies that memory reservation is reduced for every batch emitted
1859        // during the sort process. This is important to ensure we don't hold onto
1860        // memory longer than necessary.
1861
1862        // Create a large enough batch that will be split into multiple output batches
1863        let batch_size = 50; // Small batch size to force multiple output batches
1864        let num_rows = 1000; // Create enough data for multiple batches
1865
1866        let task_ctx = Arc::new(
1867            TaskContext::default().with_session_config(
1868                SessionConfig::new()
1869                    .with_batch_size(batch_size)
1870                    .with_sort_in_place_threshold_bytes(usize::MAX), // Ensure we don't concat batches
1871            ),
1872        );
1873
1874        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1875
1876        // Create unsorted data
1877        let mut values: Vec<i32> = (0..num_rows).collect();
1878        values.reverse();
1879
1880        let input_batch = RecordBatch::try_new(
1881            Arc::clone(&schema),
1882            vec![Arc::new(Int32Array::from(values))],
1883        )?;
1884
1885        let batches = vec![input_batch];
1886
1887        let sort_exec = Arc::new(SortExec::new(
1888            [PhysicalSortExpr {
1889                expr: Arc::new(Column::new("a", 0)),
1890                options: SortOptions::default(),
1891            }]
1892            .into(),
1893            TestMemoryExec::try_new_exec(
1894                std::slice::from_ref(&batches),
1895                Arc::clone(&schema),
1896                None,
1897            )?,
1898        ));
1899
1900        let mut stream = sort_exec.execute(0, Arc::clone(&task_ctx))?;
1901
1902        let mut previous_reserved = task_ctx.runtime_env().memory_pool.reserved();
1903        let mut batch_count = 0;
1904
1905        // Collect batches and verify memory is reduced with each batch
1906        while let Some(result) = stream.next().await {
1907            let batch = result?;
1908            batch_count += 1;
1909
1910            // Verify we got a non-empty batch
1911            assert!(batch.num_rows() > 0, "Batch should not be empty");
1912
1913            let current_reserved = task_ctx.runtime_env().memory_pool.reserved();
1914
1915            // After the first batch, memory should be reducing or staying the same
1916            // (it should not increase as we emit batches)
1917            if batch_count > 1 {
1918                assert!(
1919                    current_reserved <= previous_reserved,
1920                    "Memory reservation should decrease or stay same as batches are emitted. \
1921                     Batch {batch_count}: previous={previous_reserved}, current={current_reserved}"
1922                );
1923            }
1924
1925            previous_reserved = current_reserved;
1926        }
1927
1928        assert!(
1929            batch_count > 1,
1930            "Expected multiple batches to be emitted, got {batch_count}"
1931        );
1932
1933        // Verify all memory is returned at the end
1934        assert_eq!(
1935            task_ctx.runtime_env().memory_pool.reserved(),
1936            0,
1937            "All memory should be returned after consuming all batches"
1938        );
1939
1940        Ok(())
1941    }
1942
1943    #[tokio::test]
1944    async fn test_sort_metadata() -> Result<()> {
1945        let task_ctx = Arc::new(TaskContext::default());
1946        let field_metadata: HashMap<String, String> =
1947            vec![("foo".to_string(), "bar".to_string())]
1948                .into_iter()
1949                .collect();
1950        let schema_metadata: HashMap<String, String> =
1951            vec![("baz".to_string(), "barf".to_string())]
1952                .into_iter()
1953                .collect();
1954
1955        let mut field = Field::new("field_name", DataType::UInt64, true);
1956        field.set_metadata(field_metadata.clone());
1957        let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1958        let schema = Arc::new(schema);
1959
1960        let data: ArrayRef =
1961            Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1962
1963        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data])?;
1964        let input =
1965            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;
1966
1967        let sort_exec = Arc::new(SortExec::new(
1968            [PhysicalSortExpr {
1969                expr: col("field_name", &schema)?,
1970                options: SortOptions::default(),
1971            }]
1972            .into(),
1973            input,
1974        ));
1975
1976        let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1977
1978        let expected_data: ArrayRef =
1979            Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1980        let expected_batch =
1981            RecordBatch::try_new(Arc::clone(&schema), vec![expected_data])?;
1982
1983        // Data is correct
1984        assert_eq!(&vec![expected_batch], &result);
1985
1986        // explicitly ensure the metadata is present
1987        assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1988        assert_eq!(result[0].schema().metadata(), &schema_metadata);
1989
1990        Ok(())
1991    }
1992
1993    #[tokio::test]
1994    async fn test_lex_sort_by_mixed_types() -> Result<()> {
1995        let task_ctx = Arc::new(TaskContext::default());
1996        let schema = Arc::new(Schema::new(vec![
1997            Field::new("a", DataType::Int32, true),
1998            Field::new(
1999                "b",
2000                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2001                true,
2002            ),
2003        ]));
2004
2005        // define data.
2006        let batch = RecordBatch::try_new(
2007            Arc::clone(&schema),
2008            vec![
2009                Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
2010                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2011                    Some(vec![Some(3)]),
2012                    Some(vec![Some(1)]),
2013                    Some(vec![Some(6), None]),
2014                    Some(vec![Some(5)]),
2015                ])),
2016            ],
2017        )?;
2018
2019        let sort_exec = Arc::new(SortExec::new(
2020            [
2021                PhysicalSortExpr {
2022                    expr: col("a", &schema)?,
2023                    options: SortOptions {
2024                        descending: false,
2025                        nulls_first: true,
2026                    },
2027                },
2028                PhysicalSortExpr {
2029                    expr: col("b", &schema)?,
2030                    options: SortOptions {
2031                        descending: true,
2032                        nulls_first: false,
2033                    },
2034                },
2035            ]
2036            .into(),
2037            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
2038        ));
2039
2040        assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
2041        assert_eq!(
2042            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2043            *sort_exec.schema().field(1).data_type()
2044        );
2045
2046        let result: Vec<RecordBatch> =
2047            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2048        let metrics = sort_exec.metrics().unwrap();
2049        assert!(metrics.elapsed_compute().unwrap() > 0);
2050        assert_eq!(metrics.output_rows().unwrap(), 4);
2051        assert_eq!(result.len(), 1);
2052
2053        let expected = RecordBatch::try_new(
2054            schema,
2055            vec![
2056                Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
2057                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
2058                    Some(vec![Some(1)]),
2059                    Some(vec![Some(6), None]),
2060                    Some(vec![Some(5)]),
2061                    Some(vec![Some(3)]),
2062                ])),
2063            ],
2064        )?;
2065
2066        assert_eq!(expected, result[0]);
2067
2068        Ok(())
2069    }
2070
2071    #[tokio::test]
2072    async fn test_lex_sort_by_float() -> Result<()> {
2073        let task_ctx = Arc::new(TaskContext::default());
2074        let schema = Arc::new(Schema::new(vec![
2075            Field::new("a", DataType::Float32, true),
2076            Field::new("b", DataType::Float64, true),
2077        ]));
2078
2079        // define data.
2080        let batch = RecordBatch::try_new(
2081            Arc::clone(&schema),
2082            vec![
2083                Arc::new(Float32Array::from(vec![
2084                    Some(f32::NAN),
2085                    None,
2086                    None,
2087                    Some(f32::NAN),
2088                    Some(1.0_f32),
2089                    Some(1.0_f32),
2090                    Some(2.0_f32),
2091                    Some(3.0_f32),
2092                ])),
2093                Arc::new(Float64Array::from(vec![
2094                    Some(200.0_f64),
2095                    Some(20.0_f64),
2096                    Some(10.0_f64),
2097                    Some(100.0_f64),
2098                    Some(f64::NAN),
2099                    None,
2100                    None,
2101                    Some(f64::NAN),
2102                ])),
2103            ],
2104        )?;
2105
2106        let sort_exec = Arc::new(SortExec::new(
2107            [
2108                PhysicalSortExpr {
2109                    expr: col("a", &schema)?,
2110                    options: SortOptions {
2111                        descending: true,
2112                        nulls_first: true,
2113                    },
2114                },
2115                PhysicalSortExpr {
2116                    expr: col("b", &schema)?,
2117                    options: SortOptions {
2118                        descending: false,
2119                        nulls_first: false,
2120                    },
2121                },
2122            ]
2123            .into(),
2124            TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
2125        ));
2126
2127        assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
2128        assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
2129
2130        let result: Vec<RecordBatch> =
2131            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
2132        let metrics = sort_exec.metrics().unwrap();
2133        assert!(metrics.elapsed_compute().unwrap() > 0);
2134        assert_eq!(metrics.output_rows().unwrap(), 8);
2135        assert_eq!(result.len(), 1);
2136
2137        let columns = result[0].columns();
2138
2139        assert_eq!(DataType::Float32, *columns[0].data_type());
2140        assert_eq!(DataType::Float64, *columns[1].data_type());
2141
2142        let a = as_primitive_array::<Float32Type>(&columns[0])?;
2143        let b = as_primitive_array::<Float64Type>(&columns[1])?;
2144
2145        // convert result to strings to allow comparing to expected result containing NaN
2146        let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
2147            .map(|i| {
2148                let aval = if a.is_valid(i) {
2149                    Some(a.value(i).to_string())
2150                } else {
2151                    None
2152                };
2153                let bval = if b.is_valid(i) {
2154                    Some(b.value(i).to_string())
2155                } else {
2156                    None
2157                };
2158                (aval, bval)
2159            })
2160            .collect();
2161
2162        let expected: Vec<(Option<String>, Option<String>)> = vec![
2163            (None, Some("10".to_owned())),
2164            (None, Some("20".to_owned())),
2165            (Some("NaN".to_owned()), Some("100".to_owned())),
2166            (Some("NaN".to_owned()), Some("200".to_owned())),
2167            (Some("3".to_owned()), Some("NaN".to_owned())),
2168            (Some("2".to_owned()), None),
2169            (Some("1".to_owned()), Some("NaN".to_owned())),
2170            (Some("1".to_owned()), None),
2171        ];
2172
2173        assert_eq!(expected, result);
2174
2175        Ok(())
2176    }
2177
2178    #[tokio::test]
2179    async fn test_drop_cancel() -> Result<()> {
2180        let task_ctx = Arc::new(TaskContext::default());
2181        let schema =
2182            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
2183
2184        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
2185        let refs = blocking_exec.refs();
2186        let sort_exec = Arc::new(SortExec::new(
2187            [PhysicalSortExpr {
2188                expr: col("a", &schema)?,
2189                options: SortOptions::default(),
2190            }]
2191            .into(),
2192            blocking_exec,
2193        ));
2194
2195        let fut = collect(sort_exec, Arc::clone(&task_ctx));
2196        let mut fut = fut.boxed();
2197
2198        assert_is_pending(&mut fut);
2199        drop(fut);
2200        assert_strong_count_converges_to_zero(refs).await;
2201
2202        assert_eq!(
2203            task_ctx.runtime_env().memory_pool.reserved(),
2204            0,
2205            "The sort should have returned all memory used back to the memory manager"
2206        );
2207
2208        Ok(())
2209    }
2210
2211    #[test]
2212    fn test_empty_sort_batch() {
2213        let schema = Arc::new(Schema::empty());
2214        let options = RecordBatchOptions::new().with_row_count(Some(1));
2215        let batch =
2216            RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
2217                .unwrap();
2218
2219        let expressions = [PhysicalSortExpr {
2220            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2221            options: SortOptions::default(),
2222        }]
2223        .into();
2224
2225        let result = sort_batch(&batch, &expressions, None).unwrap();
2226        assert_eq!(result.num_rows(), 1);
2227    }
2228
2229    #[tokio::test]
2230    async fn topk_unbounded_source() -> Result<()> {
2231        let task_ctx = Arc::new(TaskContext::default());
2232        let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
2233        let source = SortedUnboundedExec {
2234            schema: schema.clone(),
2235            batch_size: 2,
2236            cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new(
2237                schema.clone(),
2238            ))),
2239        };
2240        let mut plan = SortExec::new(
2241            [PhysicalSortExpr::new_default(Arc::new(Column::new(
2242                "c1", 0,
2243            )))]
2244            .into(),
2245            Arc::new(source),
2246        );
2247        plan = plan.with_fetch(Some(9));
2248
2249        let batches = collect(Arc::new(plan), task_ctx).await?;
2250        assert_snapshot!(batches_to_string(&batches), @r"
2251        +----+
2252        | c1 |
2253        +----+
2254        | 0  |
2255        | 1  |
2256        | 2  |
2257        | 3  |
2258        | 4  |
2259        | 5  |
2260        | 6  |
2261        | 7  |
2262        | 8  |
2263        +----+
2264        ");
2265        Ok(())
2266    }
2267
2268    #[tokio::test]
2269    async fn should_return_stream_with_batches_in_the_requested_size() -> Result<()> {
2270        let batch_size = 100;
2271
2272        let create_task_ctx = |_: &[RecordBatch]| {
2273            TaskContext::default().with_session_config(
2274                SessionConfig::new()
2275                    .with_batch_size(batch_size)
2276                    .with_sort_in_place_threshold_bytes(usize::MAX),
2277            )
2278        };
2279
2280        // Smaller than batch size and require more than a single batch to get the requested batch size
2281        test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2282
2283        // Not evenly divisible by batch size
2284        test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2285
2286        // Evenly divisible by batch size and is larger than 2 output batches
2287        test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2288
2289        Ok(())
2290    }
2291
2292    #[tokio::test]
2293    async fn should_return_stream_with_batches_in_the_requested_size_when_sorting_in_place()
2294    -> Result<()> {
2295        let batch_size = 100;
2296
2297        let create_task_ctx = |_: &[RecordBatch]| {
2298            TaskContext::default().with_session_config(
2299                SessionConfig::new()
2300                    .with_batch_size(batch_size)
2301                    .with_sort_in_place_threshold_bytes(usize::MAX - 1),
2302            )
2303        };
2304
2305        // Smaller than batch size and require more than a single batch to get the requested batch size
2306        {
2307            let metrics =
2308                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2309
2310            assert_eq!(
2311                metrics.spill_count(),
2312                Some(0),
2313                "Expected no spills when sorting in place"
2314            );
2315        }
2316
2317        // Not evenly divisible by batch size
2318        {
2319            let metrics =
2320                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2321
2322            assert_eq!(
2323                metrics.spill_count(),
2324                Some(0),
2325                "Expected no spills when sorting in place"
2326            );
2327        }
2328
2329        // Evenly divisible by batch size and is larger than 2 output batches
2330        {
2331            let metrics =
2332                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2333
2334            assert_eq!(
2335                metrics.spill_count(),
2336                Some(0),
2337                "Expected no spills when sorting in place"
2338            );
2339        }
2340
2341        Ok(())
2342    }
2343
2344    #[tokio::test]
2345    async fn should_return_stream_with_batches_in_the_requested_size_when_having_a_single_batch()
2346    -> Result<()> {
2347        let batch_size = 100;
2348
2349        let create_task_ctx = |_: &[RecordBatch]| {
2350            TaskContext::default()
2351                .with_session_config(SessionConfig::new().with_batch_size(batch_size))
2352        };
2353
2354        // Smaller than batch size and require more than a single batch to get the requested batch size
2355        {
2356            let metrics = test_sort_output_batch_size(
2357                // Single batch
2358                1,
2359                batch_size / 4,
2360                create_task_ctx,
2361            )
2362            .await?;
2363
2364            assert_eq!(
2365                metrics.spill_count(),
2366                Some(0),
2367                "Expected no spills when sorting in place"
2368            );
2369        }
2370
2371        // Not evenly divisible by batch size
2372        {
2373            let metrics = test_sort_output_batch_size(
2374                // Single batch
2375                1,
2376                batch_size + 7,
2377                create_task_ctx,
2378            )
2379            .await?;
2380
2381            assert_eq!(
2382                metrics.spill_count(),
2383                Some(0),
2384                "Expected no spills when sorting in place"
2385            );
2386        }
2387
2388        // Evenly divisible by batch size and is larger than 2 output batches
2389        {
2390            let metrics = test_sort_output_batch_size(
2391                // Single batch
2392                1,
2393                batch_size * 3,
2394                create_task_ctx,
2395            )
2396            .await?;
2397
2398            assert_eq!(
2399                metrics.spill_count(),
2400                Some(0),
2401                "Expected no spills when sorting in place"
2402            );
2403        }
2404
2405        Ok(())
2406    }
2407
2408    #[tokio::test]
2409    async fn should_return_stream_with_batches_in_the_requested_size_when_having_to_spill()
2410    -> Result<()> {
2411        let batch_size = 100;
2412
2413        let create_task_ctx = |generated_batches: &[RecordBatch]| {
2414            let batches_memory = generated_batches
2415                .iter()
2416                .map(|b| b.get_array_memory_size())
2417                .sum::<usize>();
2418
2419            TaskContext::default()
2420                .with_session_config(
2421                    SessionConfig::new()
2422                        .with_batch_size(batch_size)
2423                        // To make sure there is no in place sorting
2424                        .with_sort_in_place_threshold_bytes(1)
2425                        .with_sort_spill_reservation_bytes(1),
2426                )
2427                .with_runtime(
2428                    RuntimeEnvBuilder::default()
2429                        .with_memory_limit(batches_memory, 1.0)
2430                        .build_arc()
2431                        .unwrap(),
2432                )
2433        };
2434
2435        // Smaller than batch size and require more than a single batch to get the requested batch size
2436        {
2437            let metrics =
2438                test_sort_output_batch_size(10, batch_size / 4, create_task_ctx).await?;
2439
2440            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2441        }
2442
2443        // Not evenly divisible by batch size
2444        {
2445            let metrics =
2446                test_sort_output_batch_size(10, batch_size + 7, create_task_ctx).await?;
2447
2448            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2449        }
2450
2451        // Evenly divisible by batch size and is larger than 2 batches
2452        {
2453            let metrics =
2454                test_sort_output_batch_size(10, batch_size * 3, create_task_ctx).await?;
2455
2456            assert_ne!(metrics.spill_count().unwrap(), 0, "expected to spill");
2457        }
2458
2459        Ok(())
2460    }
2461
2462    async fn test_sort_output_batch_size(
2463        number_of_batches: usize,
2464        batch_size_to_generate: usize,
2465        create_task_ctx: impl Fn(&[RecordBatch]) -> TaskContext,
2466    ) -> Result<MetricsSet> {
2467        let batches = (0..number_of_batches)
2468            .map(|_| make_partition(batch_size_to_generate as i32))
2469            .collect::<Vec<_>>();
2470        let task_ctx = create_task_ctx(batches.as_slice());
2471
2472        let expected_batch_size = task_ctx.session_config().batch_size();
2473
2474        let (mut output_batches, metrics) =
2475            run_sort_on_input(task_ctx, "i", batches).await?;
2476
2477        let last_batch = output_batches.pop().unwrap();
2478
2479        for batch in output_batches {
2480            assert_eq!(batch.num_rows(), expected_batch_size);
2481        }
2482
2483        let mut last_expected_batch_size =
2484            (batch_size_to_generate * number_of_batches) % expected_batch_size;
2485        if last_expected_batch_size == 0 {
2486            last_expected_batch_size = expected_batch_size;
2487        }
2488        assert_eq!(last_batch.num_rows(), last_expected_batch_size);
2489
2490        Ok(metrics)
2491    }
2492
2493    async fn run_sort_on_input(
2494        task_ctx: TaskContext,
2495        order_by_col: &str,
2496        batches: Vec<RecordBatch>,
2497    ) -> Result<(Vec<RecordBatch>, MetricsSet)> {
2498        let task_ctx = Arc::new(task_ctx);
2499
2500        // let task_ctx = env.
2501        let schema = batches[0].schema();
2502        let ordering: LexOrdering = [PhysicalSortExpr {
2503            expr: col(order_by_col, &schema)?,
2504            options: SortOptions {
2505                descending: false,
2506                nulls_first: true,
2507            },
2508        }]
2509        .into();
2510        let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
2511            ordering.clone(),
2512            TestMemoryExec::try_new_exec(std::slice::from_ref(&batches), schema, None)?,
2513        ));
2514
2515        let sorted_batches =
2516            collect(Arc::clone(&sort_exec), Arc::clone(&task_ctx)).await?;
2517
2518        let metrics = sort_exec.metrics().expect("sort have metrics");
2519
2520        // assert output
2521        {
2522            let input_batches_concat = concat_batches(batches[0].schema_ref(), &batches)?;
2523            let sorted_input_batch = sort_batch(&input_batches_concat, &ordering, None)?;
2524
2525            let sorted_batches_concat =
2526                concat_batches(sorted_batches[0].schema_ref(), &sorted_batches)?;
2527
2528            assert_eq!(sorted_input_batch, sorted_batches_concat);
2529        }
2530
2531        Ok((sorted_batches, metrics))
2532    }
2533
2534    #[tokio::test]
2535    async fn test_sort_batch_chunked_basic() -> Result<()> {
2536        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2537
2538        // Create a batch with 1000 rows
2539        let mut values: Vec<i32> = (0..1000).collect();
2540        // Shuffle to make it unsorted
2541        values.reverse();
2542
2543        let batch = RecordBatch::try_new(
2544            Arc::clone(&schema),
2545            vec![Arc::new(Int32Array::from(values))],
2546        )?;
2547
2548        let expressions: LexOrdering =
2549            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2550
2551        // Sort with batch_size = 250
2552        let result_batches = sort_batch_chunked(&batch, &expressions, 250)?;
2553
2554        // Verify 4 batches are returned
2555        assert_eq!(result_batches.len(), 4);
2556
2557        // Verify each batch has <= 250 rows
2558        let mut total_rows = 0;
2559        for (i, batch) in result_batches.iter().enumerate() {
2560            assert!(
2561                batch.num_rows() <= 250,
2562                "Batch {} has {} rows, expected <= 250",
2563                i,
2564                batch.num_rows()
2565            );
2566            total_rows += batch.num_rows();
2567        }
2568
2569        // Verify total row count matches input
2570        assert_eq!(total_rows, 1000);
2571
2572        // Verify data is correctly sorted across all chunks
2573        let concatenated = concat_batches(&schema, &result_batches)?;
2574        let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2575        for i in 0..array.len() - 1 {
2576            assert!(
2577                array.value(i) <= array.value(i + 1),
2578                "Array not sorted at position {}: {} > {}",
2579                i,
2580                array.value(i),
2581                array.value(i + 1)
2582            );
2583        }
2584        assert_eq!(array.value(0), 0);
2585        assert_eq!(array.value(array.len() - 1), 999);
2586
2587        Ok(())
2588    }
2589
2590    #[tokio::test]
2591    async fn test_sort_batch_chunked_smaller_than_batch_size() -> Result<()> {
2592        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2593
2594        // Create a batch with 50 rows
2595        let values: Vec<i32> = (0..50).rev().collect();
2596        let batch = RecordBatch::try_new(
2597            Arc::clone(&schema),
2598            vec![Arc::new(Int32Array::from(values))],
2599        )?;
2600
2601        let expressions: LexOrdering =
2602            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2603
2604        // Sort with batch_size = 100
2605        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2606
2607        // Should return exactly 1 batch
2608        assert_eq!(result_batches.len(), 1);
2609        assert_eq!(result_batches[0].num_rows(), 50);
2610
2611        // Verify it's correctly sorted
2612        let array = as_primitive_array::<Int32Type>(result_batches[0].column(0))?;
2613        for i in 0..array.len() - 1 {
2614            assert!(array.value(i) <= array.value(i + 1));
2615        }
2616        assert_eq!(array.value(0), 0);
2617        assert_eq!(array.value(49), 49);
2618
2619        Ok(())
2620    }
2621
2622    #[tokio::test]
2623    async fn test_sort_batch_chunked_exact_multiple() -> Result<()> {
2624        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2625
2626        // Create a batch with 1000 rows
2627        let values: Vec<i32> = (0..1000).rev().collect();
2628        let batch = RecordBatch::try_new(
2629            Arc::clone(&schema),
2630            vec![Arc::new(Int32Array::from(values))],
2631        )?;
2632
2633        let expressions: LexOrdering =
2634            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2635
2636        // Sort with batch_size = 100
2637        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2638
2639        // Should return exactly 10 batches of 100 rows each
2640        assert_eq!(result_batches.len(), 10);
2641        for batch in &result_batches {
2642            assert_eq!(batch.num_rows(), 100);
2643        }
2644
2645        // Verify sorted correctly across all batches
2646        let concatenated = concat_batches(&schema, &result_batches)?;
2647        let array = as_primitive_array::<Int32Type>(concatenated.column(0))?;
2648        for i in 0..array.len() - 1 {
2649            assert!(array.value(i) <= array.value(i + 1));
2650        }
2651
2652        Ok(())
2653    }
2654
2655    #[tokio::test]
2656    async fn test_sort_batch_chunked_empty_batch() -> Result<()> {
2657        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2658
2659        let batch = RecordBatch::new_empty(Arc::clone(&schema));
2660
2661        let expressions: LexOrdering =
2662            [PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into();
2663
2664        let result_batches = sort_batch_chunked(&batch, &expressions, 100)?;
2665
2666        // Empty input produces no output batches (0 chunks)
2667        assert_eq!(result_batches.len(), 0);
2668
2669        Ok(())
2670    }
2671
2672    #[tokio::test]
2673    async fn test_get_reserved_bytes_for_record_batch_with_sliced_batches() -> Result<()>
2674    {
2675        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2676
2677        // Create a larger batch then slice it
2678        let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
2679        let sliced_array = large_array.slice(100, 50); // Take 50 elements starting at 100
2680
2681        let sliced_batch =
2682            RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(sliced_array)])?;
2683        let batch =
2684            RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(large_array)])?;
2685
2686        let sliced_reserved = get_reserved_bytes_for_record_batch(&sliced_batch)?;
2687        let reserved = get_reserved_bytes_for_record_batch(&batch)?;
2688
2689        // The reserved memory for the sliced batch should be less than that of the full batch
2690        assert!(reserved > sliced_reserved);
2691
2692        Ok(())
2693    }
2694}