datafusion_physical_plan/sorts/
sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort that deals with an arbitrary size of the input.
19//! It will do in-memory sorting if it has enough memory budget
20//! but spills to disk if needed.
21
22use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use crate::common::spawn_buffered;
28use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
29use crate::expressions::PhysicalSortExpr;
30use crate::limit::LimitStream;
31use crate::metrics::{
32    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
33};
34use crate::projection::{make_with_child, update_expr, ProjectionExec};
35use crate::sorts::streaming_merge::StreamingMergeBuilder;
36use crate::spill::get_record_batch_memory_size;
37use crate::spill::in_progress_spill_file::InProgressSpillFile;
38use crate::spill::spill_manager::SpillManager;
39use crate::stream::RecordBatchStreamAdapter;
40use crate::topk::TopK;
41use crate::{
42    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
43    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
44    Statistics,
45};
46
47use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
48use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
49use arrow::datatypes::SchemaRef;
50use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
51use datafusion_execution::disk_manager::RefCountedTempFile;
52use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
53use datafusion_execution::runtime_env::RuntimeEnv;
54use datafusion_execution::TaskContext;
55use datafusion_physical_expr::LexOrdering;
56use datafusion_physical_expr_common::sort_expr::LexRequirement;
57
58use futures::{StreamExt, TryStreamExt};
59use log::{debug, trace};
60
61struct ExternalSorterMetrics {
62    /// metrics
63    baseline: BaselineMetrics,
64
65    spill_metrics: SpillMetrics,
66}
67
68impl ExternalSorterMetrics {
69    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
70        Self {
71            baseline: BaselineMetrics::new(metrics, partition),
72            spill_metrics: SpillMetrics::new(metrics, partition),
73        }
74    }
75}
76
77/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
78/// a total order. Depending on the input size and memory manager
79/// configuration, writes intermediate results to disk ("spills")
80/// using Arrow IPC format.
81///
82/// # Algorithm
83///
84/// 1. get a non-empty new batch from input
85///
86/// 2. check with the memory manager there is sufficient space to
87///    buffer the batch in memory.
88///
89/// 2.1 if memory is sufficient, buffer batch in memory, go to 1.
90///
91/// 2.2 if no more memory is available, sort all buffered batches and
92///     spill to file.  buffer the next batch in memory, go to 1.
93///
94/// 3. when input is exhausted, merge all in memory batches and spills
95///    to get a total order.
96///
97/// # When data fits in available memory
98///
99/// If there is sufficient memory, data is sorted in memory to produce the output
100///
101/// ```text
102///    ┌─────┐
103///    │  2  │
104///    │  3  │
105///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
106///    │  4  │
107///    │  2  │                  │
108///    └─────┘                  ▼
109///    ┌─────┐
110///    │  1  │              In memory
111///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
112///    │  1  │
113///    └─────┘                  ▲
114///      ...                    │
115///
116///    ┌─────┐                  │
117///    │  4  │
118///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
119///    └─────┘
120///
121/// in_mem_batches
122///
123/// ```
124///
125/// # When data does not fit in available memory
126///
127///  When memory is exhausted, data is first sorted and written to one
128///  or more spill files on disk:
129///
130/// ```text
131///    ┌─────┐                               .─────────────────.
132///    │  2  │                              (                   )
133///    │  3  │                              │`─────────────────'│
134///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
135///    │  4  │             │                │  │ 1  │░          │
136///    │  2  │                              │  │... │░          │
137///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
138///    ┌─────┐                              │  └────┘░    1  │░ │
139///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
140///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
141///    │  1  │     and write to file        │           │    ░░ │
142///    └─────┘                              │             4  │░ │
143///      ...               ▲                │           └░─░─░░ │
144///                        │                │            ░░░░░░ │
145///    ┌─────┐                              │.─────────────────.│
146///    │  4  │             │                (                   )
147///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
148///    └─────┘
149///
150/// in_mem_batches                                  spills
151///                                         (file on disk in Arrow
152///                                               IPC format)
153/// ```
154///
155/// Once the input is completely read, the spill files are read and
156/// merged with any in memory batches to produce a single total sorted
157/// output:
158///
159/// ```text
160///   .─────────────────.
161///  (                   )
162///  │`─────────────────'│
163///  │  ┌────┐           │
164///  │  │ 1  │░          │
165///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
166///  │  │ 4  │░ ┌────┐   │           │
167///  │  └────┘░ │ 1  │░  │           ▼
168///  │   ░░░░░░ │    │░  │
169///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
170///  │          │    │░  │
171///  │          │ 4  │░  │           ▲
172///  │          └────┘░  │           │
173///  │           ░░░░░░  │
174///  │.─────────────────.│           │
175///  (                   )
176///   `─────────────────'            │
177///         spills
178///                                  │
179///
180///                                  │
181///
182///     ┌─────┐                      │
183///     │  1  │
184///     │  4  │─ ─ ─ ─               │
185///     └─────┘       │
186///       ...                   In memory
187///                   └ ─ ─ ─▶  sort/merge
188///     ┌─────┐
189///     │  4  │                      ▲
190///     │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
191///     └─────┘
192///
193///  in_mem_batches
194/// ```
195struct ExternalSorter {
196    // ========================================================================
197    // PROPERTIES:
198    // Fields that define the sorter's configuration and remain constant
199    // ========================================================================
200    /// Schema of the output (and the input)
201    schema: SchemaRef,
202    /// Sort expressions
203    expr: Arc<[PhysicalSortExpr]>,
204    /// The target number of rows for output batches
205    batch_size: usize,
206    /// If the in size of buffered memory batches is below this size,
207    /// the data will be concatenated and sorted in place rather than
208    /// sort/merged.
209    sort_in_place_threshold_bytes: usize,
210
211    // ========================================================================
212    // STATE BUFFERS:
213    // Fields that hold intermediate data during sorting
214    // ========================================================================
215    /// Unsorted input batches stored in the memory buffer
216    in_mem_batches: Vec<RecordBatch>,
217
218    /// During external sorting, in-memory intermediate data will be appended to
219    /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
220    in_progress_spill_file: Option<InProgressSpillFile>,
221    /// If data has previously been spilled, the locations of the spill files (in
222    /// Arrow IPC format)
223    /// Within the same spill file, the data might be chunked into multiple batches,
224    /// and ordered by sort keys.
225    finished_spill_files: Vec<RefCountedTempFile>,
226
227    // ========================================================================
228    // EXECUTION RESOURCES:
229    // Fields related to managing execution resources and monitoring performance.
230    // ========================================================================
231    /// Runtime metrics
232    metrics: ExternalSorterMetrics,
233    /// A handle to the runtime to get spill files
234    runtime: Arc<RuntimeEnv>,
235    /// Reservation for in_mem_batches
236    reservation: MemoryReservation,
237    spill_manager: SpillManager,
238
239    /// Reservation for the merging of in-memory batches. If the sort
240    /// might spill, `sort_spill_reservation_bytes` will be
241    /// pre-reserved to ensure there is some space for this sort/merge.
242    merge_reservation: MemoryReservation,
243    /// How much memory to reserve for performing in-memory sort/merges
244    /// prior to spilling.
245    sort_spill_reservation_bytes: usize,
246}
247
248impl ExternalSorter {
249    // TODO: make a builder or some other nicer API to avoid the
250    // clippy warning
251    #[allow(clippy::too_many_arguments)]
252    pub fn new(
253        partition_id: usize,
254        schema: SchemaRef,
255        expr: LexOrdering,
256        batch_size: usize,
257        sort_spill_reservation_bytes: usize,
258        sort_in_place_threshold_bytes: usize,
259        metrics: &ExecutionPlanMetricsSet,
260        runtime: Arc<RuntimeEnv>,
261    ) -> Result<Self> {
262        let metrics = ExternalSorterMetrics::new(metrics, partition_id);
263        let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
264            .with_can_spill(true)
265            .register(&runtime.memory_pool);
266
267        let merge_reservation =
268            MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
269                .register(&runtime.memory_pool);
270
271        let spill_manager = SpillManager::new(
272            Arc::clone(&runtime),
273            metrics.spill_metrics.clone(),
274            Arc::clone(&schema),
275        );
276
277        Ok(Self {
278            schema,
279            in_mem_batches: vec![],
280            in_progress_spill_file: None,
281            finished_spill_files: vec![],
282            expr: expr.into(),
283            metrics,
284            reservation,
285            spill_manager,
286            merge_reservation,
287            runtime,
288            batch_size,
289            sort_spill_reservation_bytes,
290            sort_in_place_threshold_bytes,
291        })
292    }
293
294    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
295    ///
296    /// Updates memory usage metrics, and possibly triggers spilling to disk
297    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
298        if input.num_rows() == 0 {
299            return Ok(());
300        }
301
302        self.reserve_memory_for_merge()?;
303        self.reserve_memory_for_batch_and_maybe_spill(&input)
304            .await?;
305
306        self.in_mem_batches.push(input);
307        Ok(())
308    }
309
310    fn spilled_before(&self) -> bool {
311        !self.finished_spill_files.is_empty()
312    }
313
314    /// Returns the final sorted output of all batches inserted via
315    /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
316    ///
317    /// This process could either be:
318    ///
319    /// 1. An in-memory sort/merge (if the input fit in memory)
320    ///
321    /// 2. A combined streaming merge incorporating both in-memory
322    ///    batches and data from spill files on disk.
323    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
324        // Release the memory reserved for merge back to the pool so
325        // there is some left when `in_mem_sort_stream` requests an
326        // allocation.
327        self.merge_reservation.free();
328
329        if self.spilled_before() {
330            let mut streams = vec![];
331
332            // Sort `in_mem_batches` and spill it first. If there are many
333            // `in_mem_batches` and the memory limit is almost reached, merging
334            // them with the spilled files at the same time might cause OOM.
335            if !self.in_mem_batches.is_empty() {
336                self.sort_and_spill_in_mem_batches().await?;
337            }
338
339            for spill in self.finished_spill_files.drain(..) {
340                if !spill.path().exists() {
341                    return internal_err!("Spill file {:?} does not exist", spill.path());
342                }
343                let stream = self.spill_manager.read_spill_as_stream(spill)?;
344                streams.push(stream);
345            }
346
347            let expressions: LexOrdering = self.expr.iter().cloned().collect();
348
349            StreamingMergeBuilder::new()
350                .with_streams(streams)
351                .with_schema(Arc::clone(&self.schema))
352                .with_expressions(expressions.as_ref())
353                .with_metrics(self.metrics.baseline.clone())
354                .with_batch_size(self.batch_size)
355                .with_fetch(None)
356                .with_reservation(self.merge_reservation.new_empty())
357                .build()
358        } else {
359            self.in_mem_sort_stream(self.metrics.baseline.clone())
360        }
361    }
362
363    /// How much memory is buffered in this `ExternalSorter`?
364    fn used(&self) -> usize {
365        self.reservation.size()
366    }
367
368    /// How many bytes have been spilled to disk?
369    fn spilled_bytes(&self) -> usize {
370        self.metrics.spill_metrics.spilled_bytes.value()
371    }
372
373    /// How many rows have been spilled to disk?
374    fn spilled_rows(&self) -> usize {
375        self.metrics.spill_metrics.spilled_rows.value()
376    }
377
378    /// How many spill files have been created?
379    fn spill_count(&self) -> usize {
380        self.metrics.spill_metrics.spill_file_count.value()
381    }
382
383    /// Appending globally sorted batches to the in-progress spill file, and clears
384    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
385    async fn consume_and_spill_append(
386        &mut self,
387        globally_sorted_batches: &mut Vec<RecordBatch>,
388    ) -> Result<()> {
389        if globally_sorted_batches.is_empty() {
390            return Ok(());
391        }
392
393        // Lazily initialize the in-progress spill file
394        if self.in_progress_spill_file.is_none() {
395            self.in_progress_spill_file =
396                Some(self.spill_manager.create_in_progress_file("Sorting")?);
397        }
398
399        Self::organize_stringview_arrays(globally_sorted_batches)?;
400
401        debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
402
403        let batches_to_spill = std::mem::take(globally_sorted_batches);
404        self.reservation.free();
405
406        let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| {
407            internal_datafusion_err!("In-progress spill file should be initialized")
408        })?;
409
410        for batch in batches_to_spill {
411            in_progress_file.append_batch(&batch)?;
412        }
413
414        if !globally_sorted_batches.is_empty() {
415            return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking.");
416        }
417
418        Ok(())
419    }
420
421    /// Finishes the in-progress spill file and moves it to the finished spill files.
422    async fn spill_finish(&mut self) -> Result<()> {
423        let mut in_progress_file =
424            self.in_progress_spill_file.take().ok_or_else(|| {
425                internal_datafusion_err!("Should be called after `spill_append`")
426            })?;
427        let spill_file = in_progress_file.finish()?;
428
429        if let Some(spill_file) = spill_file {
430            self.finished_spill_files.push(spill_file);
431        }
432
433        Ok(())
434    }
435
436    /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
437    /// `StringViewArray` in sequential order by calling `gc()` on them.
438    ///
439    /// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
440    /// available
441    ///
442    /// # Rationale
443    /// After (merge-based) sorting, all batches will be sorted into a single run,
444    /// but physically this sorted run is chunked into many small batches. For
445    /// `StringViewArray`s inside each sorted run, their inner buffers are not
446    /// re-constructed by default, leading to non-sequential payload locations
447    /// (permutated by `interleave()` Arrow kernel). A single payload buffer might
448    /// be shared by multiple `RecordBatch`es.
449    /// When writing each batch to disk, the writer has to write all referenced buffers,
450    /// because they have to be read back one by one to reduce memory usage. This
451    /// causes extra disk reads and writes, and potentially execution failure.
452    ///
453    /// # Example
454    /// Before sorting:
455    /// batch1 -> buffer1
456    /// batch2 -> buffer2
457    ///
458    /// sorted_batch1 -> buffer1
459    ///               -> buffer2
460    /// sorted_batch2 -> buffer1
461    ///               -> buffer2
462    ///
463    /// Then when spilling each batch, the writer has to write all referenced buffers
464    /// repeatedly.
465    fn organize_stringview_arrays(
466        globally_sorted_batches: &mut Vec<RecordBatch>,
467    ) -> Result<()> {
468        let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
469
470        for batch in globally_sorted_batches.drain(..) {
471            let mut new_columns: Vec<Arc<dyn Array>> =
472                Vec::with_capacity(batch.num_columns());
473
474            let mut arr_mutated = false;
475            for array in batch.columns() {
476                if let Some(string_view_array) =
477                    array.as_any().downcast_ref::<StringViewArray>()
478                {
479                    let new_array = string_view_array.gc();
480                    new_columns.push(Arc::new(new_array));
481                    arr_mutated = true;
482                } else {
483                    new_columns.push(Arc::clone(array));
484                }
485            }
486
487            let organized_batch = if arr_mutated {
488                RecordBatch::try_new(batch.schema(), new_columns)?
489            } else {
490                batch
491            };
492
493            organized_batches.push(organized_batch);
494        }
495
496        *globally_sorted_batches = organized_batches;
497
498        Ok(())
499    }
500
501    /// Sorts the in-memory batches and merges them into a single sorted run, then writes
502    /// the result to spill files.
503    async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
504        if self.in_mem_batches.is_empty() {
505            return internal_err!(
506                "in_mem_batches must not be empty when attempting to sort and spill"
507            );
508        }
509
510        // Release the memory reserved for merge back to the pool so
511        // there is some left when `in_mem_sort_stream` requests an
512        // allocation. At the end of this function, memory will be
513        // reserved again for the next spill.
514        self.merge_reservation.free();
515
516        let mut sorted_stream =
517            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
518        // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
519        // to construct a globally sorted stream.
520        if !self.in_mem_batches.is_empty() {
521            return internal_err!(
522                "in_mem_batches should be empty after constructing sorted stream"
523            );
524        }
525        // 'global' here refers to all buffered batches when the memory limit is
526        // reached. This variable will buffer the sorted batches after
527        // sort-preserving merge and incrementally append to spill files.
528        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
529
530        while let Some(batch) = sorted_stream.next().await {
531            let batch = batch?;
532            let sorted_size = get_reserved_byte_for_record_batch(&batch);
533            if self.reservation.try_grow(sorted_size).is_err() {
534                // Although the reservation is not enough, the batch is
535                // already in memory, so it's okay to combine it with previously
536                // sorted batches, and spill together.
537                globally_sorted_batches.push(batch);
538                self.consume_and_spill_append(&mut globally_sorted_batches)
539                    .await?; // reservation is freed in spill()
540            } else {
541                globally_sorted_batches.push(batch);
542            }
543        }
544
545        // Drop early to free up memory reserved by the sorted stream, otherwise the
546        // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
547        drop(sorted_stream);
548
549        self.consume_and_spill_append(&mut globally_sorted_batches)
550            .await?;
551        self.spill_finish().await?;
552
553        // Sanity check after spilling
554        let buffers_cleared_property =
555            self.in_mem_batches.is_empty() && globally_sorted_batches.is_empty();
556        if !buffers_cleared_property {
557            return internal_err!(
558                "in_mem_batches and globally_sorted_batches should be cleared before"
559            );
560        }
561
562        // Reserve headroom for next sort/merge
563        self.reserve_memory_for_merge()?;
564
565        Ok(())
566    }
567
568    /// Consumes in_mem_batches returning a sorted stream of
569    /// batches. This proceeds in one of two ways:
570    ///
571    /// # Small Datasets
572    ///
573    /// For "smaller" datasets, the data is first concatenated into a
574    /// single batch and then sorted. This is often faster than
575    /// sorting and then merging.
576    ///
577    /// ```text
578    ///        ┌─────┐
579    ///        │  2  │
580    ///        │  3  │
581    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
582    ///        │  4  │                     │  2  │
583    ///        │  2  │        │            │  3  │
584    ///        └─────┘                     │  1  │             sorted output
585    ///        ┌─────┐        ▼            │  4  │                stream
586    ///        │  1  │                     │  2  │
587    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
588    ///        │  1  │                     │  4  │
589    ///        └─────┘        ▲            │  1  │
590    ///          ...          │            │ ... │
591    ///                                    │  4  │
592    ///        ┌─────┐        │            │  3  │
593    ///        │  4  │                     └─────┘
594    ///        │  3  │─ ─ ─ ─ ┘
595    ///        └─────┘
596    ///     in_mem_batches
597    /// ```
598    ///
599    /// # Larger datasets
600    ///
601    /// For larger datasets, the batches are first sorted individually
602    /// and then merged together.
603    ///
604    /// ```text
605    ///      ┌─────┐                ┌─────┐
606    ///      │  2  │                │  1  │
607    ///      │  3  │                │  2  │
608    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
609    ///      │  4  │                │  3  │
610    ///      │  2  │                │  4  │          │
611    ///      └─────┘                └─────┘               sorted output
612    ///      ┌─────┐                ┌─────┐          ▼       stream
613    ///      │  1  │                │  1  │
614    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
615    ///      │  1  │                │  4  │
616    ///      └─────┘                └─────┘          ▲
617    ///        ...       ...         ...             │
618    ///
619    ///      ┌─────┐                ┌─────┐          │
620    ///      │  4  │                │  3  │
621    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
622    ///      └─────┘                └─────┘
623    ///
624    ///   in_mem_batches
625    /// ```
626    fn in_mem_sort_stream(
627        &mut self,
628        metrics: BaselineMetrics,
629    ) -> Result<SendableRecordBatchStream> {
630        if self.in_mem_batches.is_empty() {
631            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
632                &self.schema,
633            ))));
634        }
635
636        // The elapsed compute timer is updated when the value is dropped.
637        // There is no need for an explicit call to drop.
638        let elapsed_compute = metrics.elapsed_compute().clone();
639        let _timer = elapsed_compute.timer();
640
641        // Please pay attention that any operation inside of `in_mem_sort_stream` will
642        // not perform any memory reservation. This is for avoiding the need of handling
643        // reservation failure and spilling in the middle of the sort/merge. The memory
644        // space for batches produced by the resulting stream will be reserved by the
645        // consumer of the stream.
646
647        if self.in_mem_batches.len() == 1 {
648            let batch = self.in_mem_batches.swap_remove(0);
649            let reservation = self.reservation.take();
650            return self.sort_batch_stream(batch, metrics, reservation);
651        }
652
653        // If less than sort_in_place_threshold_bytes, concatenate and sort in place
654        if self.reservation.size() < self.sort_in_place_threshold_bytes {
655            // Concatenate memory batches together and sort
656            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
657            self.in_mem_batches.clear();
658            self.reservation
659                .try_resize(get_reserved_byte_for_record_batch(&batch))
660                .map_err(Self::err_with_oom_context)?;
661            let reservation = self.reservation.take();
662            return self.sort_batch_stream(batch, metrics, reservation);
663        }
664
665        let streams = std::mem::take(&mut self.in_mem_batches)
666            .into_iter()
667            .map(|batch| {
668                let metrics = self.metrics.baseline.intermediate();
669                let reservation = self
670                    .reservation
671                    .split(get_reserved_byte_for_record_batch(&batch));
672                let input = self.sort_batch_stream(batch, metrics, reservation)?;
673                Ok(spawn_buffered(input, 1))
674            })
675            .collect::<Result<_>>()?;
676
677        let expressions: LexOrdering = self.expr.iter().cloned().collect();
678
679        StreamingMergeBuilder::new()
680            .with_streams(streams)
681            .with_schema(Arc::clone(&self.schema))
682            .with_expressions(expressions.as_ref())
683            .with_metrics(metrics)
684            .with_batch_size(self.batch_size)
685            .with_fetch(None)
686            .with_reservation(self.merge_reservation.new_empty())
687            .build()
688    }
689
690    /// Sorts a single `RecordBatch` into a single stream.
691    ///
692    /// `reservation` accounts for the memory used by this batch and
693    /// is released when the sort is complete
694    fn sort_batch_stream(
695        &self,
696        batch: RecordBatch,
697        metrics: BaselineMetrics,
698        reservation: MemoryReservation,
699    ) -> Result<SendableRecordBatchStream> {
700        assert_eq!(
701            get_reserved_byte_for_record_batch(&batch),
702            reservation.size()
703        );
704        let schema = batch.schema();
705
706        let expressions: LexOrdering = self.expr.iter().cloned().collect();
707        let stream = futures::stream::once(async move {
708            let _timer = metrics.elapsed_compute().timer();
709
710            let sorted = sort_batch(&batch, &expressions, None)?;
711
712            metrics.record_output(sorted.num_rows());
713            drop(batch);
714            drop(reservation);
715            Ok(sorted)
716        });
717
718        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
719    }
720
721    /// If this sort may spill, pre-allocates
722    /// `sort_spill_reservation_bytes` of memory to guarantee memory
723    /// left for the in memory sort/merge.
724    fn reserve_memory_for_merge(&mut self) -> Result<()> {
725        // Reserve headroom for next merge sort
726        if self.runtime.disk_manager.tmp_files_enabled() {
727            let size = self.sort_spill_reservation_bytes;
728            if self.merge_reservation.size() != size {
729                self.merge_reservation
730                    .try_resize(size)
731                    .map_err(Self::err_with_oom_context)?;
732            }
733        }
734
735        Ok(())
736    }
737
738    /// Reserves memory to be able to accommodate the given batch.
739    /// If memory is scarce, tries to spill current in-memory batches to disk first.
740    async fn reserve_memory_for_batch_and_maybe_spill(
741        &mut self,
742        input: &RecordBatch,
743    ) -> Result<()> {
744        let size = get_reserved_byte_for_record_batch(input);
745
746        match self.reservation.try_grow(size) {
747            Ok(_) => Ok(()),
748            Err(e) => {
749                if self.in_mem_batches.is_empty() {
750                    return Err(Self::err_with_oom_context(e));
751                }
752
753                // Spill and try again.
754                self.sort_and_spill_in_mem_batches().await?;
755                self.reservation
756                    .try_grow(size)
757                    .map_err(Self::err_with_oom_context)
758            }
759        }
760    }
761
762    /// Wraps the error with a context message suggesting settings to tweak.
763    /// This is meant to be used with DataFusionError::ResourcesExhausted only.
764    fn err_with_oom_context(e: DataFusionError) -> DataFusionError {
765        match e {
766            DataFusionError::ResourcesExhausted(_) => e.context(
767                "Not enough memory to continue external sort. \
768                    Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes"
769            ),
770            // This is not an OOM error, so just return it as is.
771            _ => e,
772        }
773    }
774}
775
776/// Estimate how much memory is needed to sort a `RecordBatch`.
777///
778/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
779/// creating sorted copies of sorted columns in record batches for speeding up comparison
780/// in sorting and merging. The sorted copies are in either row format or array format.
781/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
782/// sorted copies are, they will use more memory than the original record batch.
783fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
784    // 2x may not be enough for some cases, but it's a good start.
785    // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
786    // to compensate for the extra memory needed.
787    get_record_batch_memory_size(batch) * 2
788}
789
790impl Debug for ExternalSorter {
791    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
792        f.debug_struct("ExternalSorter")
793            .field("memory_used", &self.used())
794            .field("spilled_bytes", &self.spilled_bytes())
795            .field("spilled_rows", &self.spilled_rows())
796            .field("spill_count", &self.spill_count())
797            .finish()
798    }
799}
800
801pub fn sort_batch(
802    batch: &RecordBatch,
803    expressions: &LexOrdering,
804    fetch: Option<usize>,
805) -> Result<RecordBatch> {
806    let sort_columns = expressions
807        .iter()
808        .map(|expr| expr.evaluate_to_sort_column(batch))
809        .collect::<Result<Vec<_>>>()?;
810
811    let indices = lexsort_to_indices(&sort_columns, fetch)?;
812    let mut columns = take_arrays(batch.columns(), &indices, None)?;
813
814    // The columns may be larger than the unsorted columns in `batch` especially for variable length
815    // data types due to exponential growth when building the sort columns. We shrink the columns
816    // to prevent memory reservation failures, as well as excessive memory allocation when running
817    // merges in `SortPreservingMergeStream`.
818    columns.iter_mut().for_each(|c| {
819        c.shrink_to_fit();
820    });
821
822    let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
823    Ok(RecordBatch::try_new_with_options(
824        batch.schema(),
825        columns,
826        &options,
827    )?)
828}
829
830/// Sort execution plan.
831///
832/// Support sorting datasets that are larger than the memory allotted
833/// by the memory manager, by spilling to disk.
834#[derive(Debug, Clone)]
835pub struct SortExec {
836    /// Input schema
837    pub(crate) input: Arc<dyn ExecutionPlan>,
838    /// Sort expressions
839    expr: LexOrdering,
840    /// Containing all metrics set created during sort
841    metrics_set: ExecutionPlanMetricsSet,
842    /// Preserve partitions of input plan. If false, the input partitions
843    /// will be sorted and merged into a single output partition.
844    preserve_partitioning: bool,
845    /// Fetch highest/lowest n results
846    fetch: Option<usize>,
847    /// Normalized common sort prefix between the input and the sort expressions (only used with fetch)
848    common_sort_prefix: LexOrdering,
849    /// Cache holding plan properties like equivalences, output partitioning etc.
850    cache: PlanProperties,
851}
852
853impl SortExec {
854    /// Create a new sort execution plan that produces a single,
855    /// sorted output partition.
856    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
857        let preserve_partitioning = false;
858        let (cache, sort_prefix) =
859            Self::compute_properties(&input, expr.clone(), preserve_partitioning);
860        Self {
861            expr,
862            input,
863            metrics_set: ExecutionPlanMetricsSet::new(),
864            preserve_partitioning,
865            fetch: None,
866            common_sort_prefix: sort_prefix,
867            cache,
868        }
869    }
870
871    /// Whether this `SortExec` preserves partitioning of the children
872    pub fn preserve_partitioning(&self) -> bool {
873        self.preserve_partitioning
874    }
875
876    /// Specify the partitioning behavior of this sort exec
877    ///
878    /// If `preserve_partitioning` is true, sorts each partition
879    /// individually, producing one sorted stream for each input partition.
880    ///
881    /// If `preserve_partitioning` is false, sorts and merges all
882    /// input partitions producing a single, sorted partition.
883    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
884        self.preserve_partitioning = preserve_partitioning;
885        self.cache = self
886            .cache
887            .with_partitioning(Self::output_partitioning_helper(
888                &self.input,
889                self.preserve_partitioning,
890            ));
891        self
892    }
893
894    /// Modify how many rows to include in the result
895    ///
896    /// If None, then all rows will be returned, in sorted order.
897    /// If Some, then only the top `fetch` rows will be returned.
898    /// This can reduce the memory pressure required by the sort
899    /// operation since rows that are not going to be included
900    /// can be dropped.
901    pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
902        let mut cache = self.cache.clone();
903        // If the SortExec can emit incrementally (that means the sort requirements
904        // and properties of the input match), the SortExec can generate its result
905        // without scanning the entire input when a fetch value exists.
906        let is_pipeline_friendly = matches!(
907            self.cache.emission_type,
908            EmissionType::Incremental | EmissionType::Both
909        );
910        if fetch.is_some() && is_pipeline_friendly {
911            cache = cache.with_boundedness(Boundedness::Bounded);
912        }
913        SortExec {
914            input: Arc::clone(&self.input),
915            expr: self.expr.clone(),
916            metrics_set: self.metrics_set.clone(),
917            preserve_partitioning: self.preserve_partitioning,
918            common_sort_prefix: self.common_sort_prefix.clone(),
919            fetch,
920            cache,
921        }
922    }
923
924    /// Input schema
925    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
926        &self.input
927    }
928
929    /// Sort expressions
930    pub fn expr(&self) -> &LexOrdering {
931        &self.expr
932    }
933
934    /// If `Some(fetch)`, limits output to only the first "fetch" items
935    pub fn fetch(&self) -> Option<usize> {
936        self.fetch
937    }
938
939    fn output_partitioning_helper(
940        input: &Arc<dyn ExecutionPlan>,
941        preserve_partitioning: bool,
942    ) -> Partitioning {
943        // Get output partitioning:
944        if preserve_partitioning {
945            input.output_partitioning().clone()
946        } else {
947            Partitioning::UnknownPartitioning(1)
948        }
949    }
950
951    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
952    /// It also returns the common sort prefix between the input and the sort expressions.
953    fn compute_properties(
954        input: &Arc<dyn ExecutionPlan>,
955        sort_exprs: LexOrdering,
956        preserve_partitioning: bool,
957    ) -> (PlanProperties, LexOrdering) {
958        // Determine execution mode:
959        let requirement = LexRequirement::from(sort_exprs);
960
961        let (sort_prefix, sort_satisfied) = input
962            .equivalence_properties()
963            .extract_common_sort_prefix(&requirement);
964
965        // The emission type depends on whether the input is already sorted:
966        // - If already fully sorted, we can emit results in the same way as the input
967        // - If not sorted, we must wait until all data is processed to emit results (Final)
968        let emission_type = if sort_satisfied {
969            input.pipeline_behavior()
970        } else {
971            EmissionType::Final
972        };
973
974        // The boundedness depends on whether the input is already sorted:
975        // - If already sorted, we have the same property as the input
976        // - If not sorted and input is unbounded, we require infinite memory and generates
977        //   unbounded data (not practical).
978        // - If not sorted and input is bounded, then the SortExec is bounded, too.
979        let boundedness = if sort_satisfied {
980            input.boundedness()
981        } else {
982            match input.boundedness() {
983                Boundedness::Unbounded { .. } => Boundedness::Unbounded {
984                    requires_infinite_memory: true,
985                },
986                bounded => bounded,
987            }
988        };
989
990        // Calculate equivalence properties; i.e. reset the ordering equivalence
991        // class with the new ordering:
992        let sort_exprs = LexOrdering::from(requirement);
993        let eq_properties = input
994            .equivalence_properties()
995            .clone()
996            .with_reorder(sort_exprs);
997
998        // Get output partitioning:
999        let output_partitioning =
1000            Self::output_partitioning_helper(input, preserve_partitioning);
1001
1002        (
1003            PlanProperties::new(
1004                eq_properties,
1005                output_partitioning,
1006                emission_type,
1007                boundedness,
1008            ),
1009            LexOrdering::from(sort_prefix),
1010        )
1011    }
1012}
1013
1014impl DisplayAs for SortExec {
1015    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1016        match t {
1017            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1018                let preserve_partitioning = self.preserve_partitioning;
1019                match self.fetch {
1020                    Some(fetch) => {
1021                        write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
1022                        if !self.common_sort_prefix.is_empty() {
1023                            write!(f, ", sort_prefix=[{}]", self.common_sort_prefix)
1024                        } else {
1025                            Ok(())
1026                        }
1027                    }
1028                    None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
1029                }
1030            }
1031            DisplayFormatType::TreeRender => match self.fetch {
1032                Some(fetch) => {
1033                    writeln!(f, "{}", self.expr)?;
1034                    writeln!(f, "limit={fetch}")
1035                }
1036                None => {
1037                    writeln!(f, "{}", self.expr)
1038                }
1039            },
1040        }
1041    }
1042}
1043
1044impl ExecutionPlan for SortExec {
1045    fn name(&self) -> &'static str {
1046        match self.fetch {
1047            Some(_) => "SortExec(TopK)",
1048            None => "SortExec",
1049        }
1050    }
1051
1052    fn as_any(&self) -> &dyn Any {
1053        self
1054    }
1055
1056    fn properties(&self) -> &PlanProperties {
1057        &self.cache
1058    }
1059
1060    fn required_input_distribution(&self) -> Vec<Distribution> {
1061        if self.preserve_partitioning {
1062            vec![Distribution::UnspecifiedDistribution]
1063        } else {
1064            // global sort
1065            // TODO support RangePartition and OrderedDistribution
1066            vec![Distribution::SinglePartition]
1067        }
1068    }
1069
1070    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1071        vec![&self.input]
1072    }
1073
1074    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1075        vec![false]
1076    }
1077
1078    fn with_new_children(
1079        self: Arc<Self>,
1080        children: Vec<Arc<dyn ExecutionPlan>>,
1081    ) -> Result<Arc<dyn ExecutionPlan>> {
1082        let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
1083            .with_fetch(self.fetch)
1084            .with_preserve_partitioning(self.preserve_partitioning);
1085
1086        Ok(Arc::new(new_sort))
1087    }
1088
1089    fn execute(
1090        &self,
1091        partition: usize,
1092        context: Arc<TaskContext>,
1093    ) -> Result<SendableRecordBatchStream> {
1094        trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
1095
1096        let mut input = self.input.execute(partition, Arc::clone(&context))?;
1097
1098        let execution_options = &context.session_config().options().execution;
1099
1100        trace!("End SortExec's input.execute for partition: {partition}");
1101
1102        let requirement = &LexRequirement::from(self.expr.clone());
1103
1104        let sort_satisfied = self
1105            .input
1106            .equivalence_properties()
1107            .ordering_satisfy_requirement(requirement);
1108
1109        match (sort_satisfied, self.fetch.as_ref()) {
1110            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1111                input,
1112                0,
1113                Some(*fetch),
1114                BaselineMetrics::new(&self.metrics_set, partition),
1115            ))),
1116            (true, None) => Ok(input),
1117            (false, Some(fetch)) => {
1118                let mut topk = TopK::try_new(
1119                    partition,
1120                    input.schema(),
1121                    self.common_sort_prefix.clone(),
1122                    self.expr.clone(),
1123                    *fetch,
1124                    context.session_config().batch_size(),
1125                    context.runtime_env(),
1126                    &self.metrics_set,
1127                )?;
1128                Ok(Box::pin(RecordBatchStreamAdapter::new(
1129                    self.schema(),
1130                    futures::stream::once(async move {
1131                        while let Some(batch) = input.next().await {
1132                            let batch = batch?;
1133                            topk.insert_batch(batch)?;
1134                            if topk.finished {
1135                                break;
1136                            }
1137                        }
1138                        topk.emit()
1139                    })
1140                    .try_flatten(),
1141                )))
1142            }
1143            (false, None) => {
1144                let mut sorter = ExternalSorter::new(
1145                    partition,
1146                    input.schema(),
1147                    self.expr.clone(),
1148                    context.session_config().batch_size(),
1149                    execution_options.sort_spill_reservation_bytes,
1150                    execution_options.sort_in_place_threshold_bytes,
1151                    &self.metrics_set,
1152                    context.runtime_env(),
1153                )?;
1154                Ok(Box::pin(RecordBatchStreamAdapter::new(
1155                    self.schema(),
1156                    futures::stream::once(async move {
1157                        while let Some(batch) = input.next().await {
1158                            let batch = batch?;
1159                            sorter.insert_batch(batch).await?;
1160                        }
1161                        sorter.sort().await
1162                    })
1163                    .try_flatten(),
1164                )))
1165            }
1166        }
1167    }
1168
1169    fn metrics(&self) -> Option<MetricsSet> {
1170        Some(self.metrics_set.clone_inner())
1171    }
1172
1173    fn statistics(&self) -> Result<Statistics> {
1174        self.partition_statistics(None)
1175    }
1176
1177    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1178        if !self.preserve_partitioning() {
1179            return self.input.partition_statistics(None)?.with_fetch(
1180                self.schema(),
1181                self.fetch,
1182                0,
1183                1,
1184            );
1185        }
1186        self.input.partition_statistics(partition)?.with_fetch(
1187            self.schema(),
1188            self.fetch,
1189            0,
1190            1,
1191        )
1192    }
1193
1194    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1195        Some(Arc::new(SortExec::with_fetch(self, limit)))
1196    }
1197
1198    fn fetch(&self) -> Option<usize> {
1199        self.fetch
1200    }
1201
1202    fn cardinality_effect(&self) -> CardinalityEffect {
1203        if self.fetch.is_none() {
1204            CardinalityEffect::Equal
1205        } else {
1206            CardinalityEffect::LowerEqual
1207        }
1208    }
1209
1210    /// Tries to swap the projection with its input [`SortExec`]. If it can be done,
1211    /// it returns the new swapped version having the [`SortExec`] as the top plan.
1212    /// Otherwise, it returns None.
1213    fn try_swapping_with_projection(
1214        &self,
1215        projection: &ProjectionExec,
1216    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1217        // If the projection does not narrow the schema, we should not try to push it down.
1218        if projection.expr().len() >= projection.input().schema().fields().len() {
1219            return Ok(None);
1220        }
1221
1222        let mut updated_exprs = LexOrdering::default();
1223        for sort in self.expr() {
1224            let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)?
1225            else {
1226                return Ok(None);
1227            };
1228            updated_exprs.push(PhysicalSortExpr {
1229                expr: new_expr,
1230                options: sort.options,
1231            });
1232        }
1233
1234        Ok(Some(Arc::new(
1235            SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1236                .with_fetch(self.fetch())
1237                .with_preserve_partitioning(self.preserve_partitioning()),
1238        )))
1239    }
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244    use std::collections::HashMap;
1245    use std::pin::Pin;
1246    use std::task::{Context, Poll};
1247
1248    use super::*;
1249    use crate::coalesce_partitions::CoalescePartitionsExec;
1250    use crate::collect;
1251    use crate::execution_plan::Boundedness;
1252    use crate::expressions::col;
1253    use crate::test;
1254    use crate::test::assert_is_pending;
1255    use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1256    use crate::test::TestMemoryExec;
1257
1258    use arrow::array::*;
1259    use arrow::compute::SortOptions;
1260    use arrow::datatypes::*;
1261    use datafusion_common::cast::as_primitive_array;
1262    use datafusion_common::test_util::batches_to_string;
1263    use datafusion_common::{DataFusionError, Result, ScalarValue};
1264    use datafusion_execution::config::SessionConfig;
1265    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1266    use datafusion_execution::RecordBatchStream;
1267    use datafusion_physical_expr::expressions::{Column, Literal};
1268    use datafusion_physical_expr::EquivalenceProperties;
1269
1270    use futures::{FutureExt, Stream};
1271    use insta::assert_snapshot;
1272
1273    #[derive(Debug, Clone)]
1274    pub struct SortedUnboundedExec {
1275        schema: Schema,
1276        batch_size: u64,
1277        cache: PlanProperties,
1278    }
1279
1280    impl DisplayAs for SortedUnboundedExec {
1281        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1282            match t {
1283                DisplayFormatType::Default
1284                | DisplayFormatType::Verbose
1285                | DisplayFormatType::TreeRender => write!(f, "UnboundableExec",).unwrap(),
1286            }
1287            Ok(())
1288        }
1289    }
1290
1291    impl SortedUnboundedExec {
1292        fn compute_properties(schema: SchemaRef) -> PlanProperties {
1293            let mut eq_properties = EquivalenceProperties::new(schema);
1294            eq_properties.add_new_orderings(vec![LexOrdering::new(vec![
1295                PhysicalSortExpr::new_default(Arc::new(Column::new("c1", 0))),
1296            ])]);
1297            PlanProperties::new(
1298                eq_properties,
1299                Partitioning::UnknownPartitioning(1),
1300                EmissionType::Final,
1301                Boundedness::Unbounded {
1302                    requires_infinite_memory: false,
1303                },
1304            )
1305        }
1306    }
1307
1308    impl ExecutionPlan for SortedUnboundedExec {
1309        fn name(&self) -> &'static str {
1310            Self::static_name()
1311        }
1312
1313        fn as_any(&self) -> &dyn Any {
1314            self
1315        }
1316
1317        fn properties(&self) -> &PlanProperties {
1318            &self.cache
1319        }
1320
1321        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1322            vec![]
1323        }
1324
1325        fn with_new_children(
1326            self: Arc<Self>,
1327            _: Vec<Arc<dyn ExecutionPlan>>,
1328        ) -> Result<Arc<dyn ExecutionPlan>> {
1329            Ok(self)
1330        }
1331
1332        fn execute(
1333            &self,
1334            _partition: usize,
1335            _context: Arc<TaskContext>,
1336        ) -> Result<SendableRecordBatchStream> {
1337            Ok(Box::pin(SortedUnboundedStream {
1338                schema: Arc::new(self.schema.clone()),
1339                batch_size: self.batch_size,
1340                offset: 0,
1341            }))
1342        }
1343    }
1344
1345    #[derive(Debug)]
1346    pub struct SortedUnboundedStream {
1347        schema: SchemaRef,
1348        batch_size: u64,
1349        offset: u64,
1350    }
1351
1352    impl Stream for SortedUnboundedStream {
1353        type Item = Result<RecordBatch>;
1354
1355        fn poll_next(
1356            mut self: Pin<&mut Self>,
1357            _cx: &mut Context<'_>,
1358        ) -> Poll<Option<Self::Item>> {
1359            let batch = SortedUnboundedStream::create_record_batch(
1360                Arc::clone(&self.schema),
1361                self.offset,
1362                self.batch_size,
1363            );
1364            self.offset += self.batch_size;
1365            Poll::Ready(Some(Ok(batch)))
1366        }
1367    }
1368
1369    impl RecordBatchStream for SortedUnboundedStream {
1370        fn schema(&self) -> SchemaRef {
1371            Arc::clone(&self.schema)
1372        }
1373    }
1374
1375    impl SortedUnboundedStream {
1376        fn create_record_batch(
1377            schema: SchemaRef,
1378            offset: u64,
1379            batch_size: u64,
1380        ) -> RecordBatch {
1381            let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1382            let array = UInt64Array::from(values);
1383            let array_ref: ArrayRef = Arc::new(array);
1384            RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1385        }
1386    }
1387
1388    #[tokio::test]
1389    async fn test_in_mem_sort() -> Result<()> {
1390        let task_ctx = Arc::new(TaskContext::default());
1391        let partitions = 4;
1392        let csv = test::scan_partitioned(partitions);
1393        let schema = csv.schema();
1394
1395        let sort_exec = Arc::new(SortExec::new(
1396            LexOrdering::new(vec![PhysicalSortExpr {
1397                expr: col("i", &schema)?,
1398                options: SortOptions::default(),
1399            }]),
1400            Arc::new(CoalescePartitionsExec::new(csv)),
1401        ));
1402
1403        let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1404
1405        assert_eq!(result.len(), 1);
1406        assert_eq!(result[0].num_rows(), 400);
1407
1408        assert_eq!(
1409            task_ctx.runtime_env().memory_pool.reserved(),
1410            0,
1411            "The sort should have returned all memory used back to the memory manager"
1412        );
1413
1414        Ok(())
1415    }
1416
1417    #[tokio::test]
1418    async fn test_sort_spill() -> Result<()> {
1419        // trigger spill w/ 100 batches
1420        let session_config = SessionConfig::new();
1421        let sort_spill_reservation_bytes = session_config
1422            .options()
1423            .execution
1424            .sort_spill_reservation_bytes;
1425        let runtime = RuntimeEnvBuilder::new()
1426            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1427            .build_arc()?;
1428        let task_ctx = Arc::new(
1429            TaskContext::default()
1430                .with_session_config(session_config)
1431                .with_runtime(runtime),
1432        );
1433
1434        // The input has 100 partitions, each partition has a batch containing 100 rows.
1435        // Each row has a single Int32 column with values 0..100. The total size of the
1436        // input is roughly 40000 bytes.
1437        let partitions = 100;
1438        let input = test::scan_partitioned(partitions);
1439        let schema = input.schema();
1440
1441        let sort_exec = Arc::new(SortExec::new(
1442            LexOrdering::new(vec![PhysicalSortExpr {
1443                expr: col("i", &schema)?,
1444                options: SortOptions::default(),
1445            }]),
1446            Arc::new(CoalescePartitionsExec::new(input)),
1447        ));
1448
1449        let result = collect(
1450            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1451            Arc::clone(&task_ctx),
1452        )
1453        .await?;
1454
1455        assert_eq!(result.len(), 2);
1456
1457        // Now, validate metrics
1458        let metrics = sort_exec.metrics().unwrap();
1459
1460        assert_eq!(metrics.output_rows().unwrap(), 10000);
1461        assert!(metrics.elapsed_compute().unwrap() > 0);
1462
1463        let spill_count = metrics.spill_count().unwrap();
1464        let spilled_rows = metrics.spilled_rows().unwrap();
1465        let spilled_bytes = metrics.spilled_bytes().unwrap();
1466        // Processing 40000 bytes of data using 12288 bytes of memory requires 3 spills
1467        // unless we do something really clever. It will spill roughly 9000+ rows and 36000
1468        // bytes. We leave a little wiggle room for the actual numbers.
1469        assert!((3..=10).contains(&spill_count));
1470        assert!((9000..=10000).contains(&spilled_rows));
1471        assert!((38000..=42000).contains(&spilled_bytes));
1472
1473        let columns = result[0].columns();
1474
1475        let i = as_primitive_array::<Int32Type>(&columns[0])?;
1476        assert_eq!(i.value(0), 0);
1477        assert_eq!(i.value(i.len() - 1), 81);
1478
1479        assert_eq!(
1480            task_ctx.runtime_env().memory_pool.reserved(),
1481            0,
1482            "The sort should have returned all memory used back to the memory manager"
1483        );
1484
1485        Ok(())
1486    }
1487
1488    #[tokio::test]
1489    async fn test_batch_reservation_error() -> Result<()> {
1490        // Pick a memory limit and sort_spill_reservation that make the first batch reservation fail.
1491        // These values assume that the ExternalSorter will reserve 800 bytes for the first batch.
1492        let expected_batch_reservation = 800;
1493        let merge_reservation: usize = 0; // Set to 0 for simplicity
1494        let memory_limit: usize = expected_batch_reservation + merge_reservation - 1; // Just short of what we need
1495
1496        let session_config =
1497            SessionConfig::new().with_sort_spill_reservation_bytes(merge_reservation);
1498        let runtime = RuntimeEnvBuilder::new()
1499            .with_memory_limit(memory_limit, 1.0)
1500            .build_arc()?;
1501        let task_ctx = Arc::new(
1502            TaskContext::default()
1503                .with_session_config(session_config)
1504                .with_runtime(runtime),
1505        );
1506
1507        let plan = test::scan_partitioned(1);
1508
1509        // Read the first record batch to assert that our memory limit and sort_spill_reservation
1510        // settings trigger the test scenario.
1511        {
1512            let mut stream = plan.execute(0, Arc::clone(&task_ctx))?;
1513            let first_batch = stream.next().await.unwrap()?;
1514            let batch_reservation = get_reserved_byte_for_record_batch(&first_batch);
1515
1516            assert_eq!(batch_reservation, expected_batch_reservation);
1517            assert!(memory_limit < (merge_reservation + batch_reservation));
1518        }
1519
1520        let sort_exec = Arc::new(SortExec::new(
1521            LexOrdering::new(vec![PhysicalSortExpr {
1522                expr: col("i", &plan.schema())?,
1523                options: SortOptions::default(),
1524            }]),
1525            plan,
1526        ));
1527
1528        let result = collect(
1529            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1530            Arc::clone(&task_ctx),
1531        )
1532        .await;
1533
1534        let err = result.unwrap_err();
1535        assert!(
1536            matches!(err, DataFusionError::Context(..)),
1537            "Assertion failed: expected a Context error, but got: {err:?}"
1538        );
1539
1540        // Assert that the context error is wrapping a resources exhausted error.
1541        assert!(
1542            matches!(err.find_root(), DataFusionError::ResourcesExhausted(_)),
1543            "Assertion failed: expected a ResourcesExhausted error, but got: {err:?}"
1544        );
1545
1546        Ok(())
1547    }
1548
1549    #[tokio::test]
1550    async fn test_sort_spill_utf8_strings() -> Result<()> {
1551        let session_config = SessionConfig::new()
1552            .with_batch_size(100)
1553            .with_sort_in_place_threshold_bytes(20 * 1024)
1554            .with_sort_spill_reservation_bytes(100 * 1024);
1555        let runtime = RuntimeEnvBuilder::new()
1556            .with_memory_limit(500 * 1024, 1.0)
1557            .build_arc()?;
1558        let task_ctx = Arc::new(
1559            TaskContext::default()
1560                .with_session_config(session_config)
1561                .with_runtime(runtime),
1562        );
1563
1564        // The input has 200 partitions, each partition has a batch containing 100 rows.
1565        // Each row has a single Utf8 column, the Utf8 string values are roughly 42 bytes.
1566        // The total size of the input is roughly 8.4 KB.
1567        let input = test::scan_partitioned_utf8(200);
1568        let schema = input.schema();
1569
1570        let sort_exec = Arc::new(SortExec::new(
1571            LexOrdering::new(vec![PhysicalSortExpr {
1572                expr: col("i", &schema)?,
1573                options: SortOptions::default(),
1574            }]),
1575            Arc::new(CoalescePartitionsExec::new(input)),
1576        ));
1577
1578        let result = collect(
1579            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1580            Arc::clone(&task_ctx),
1581        )
1582        .await?;
1583
1584        let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1585        assert_eq!(num_rows, 20000);
1586
1587        // Now, validate metrics
1588        let metrics = sort_exec.metrics().unwrap();
1589
1590        assert_eq!(metrics.output_rows().unwrap(), 20000);
1591        assert!(metrics.elapsed_compute().unwrap() > 0);
1592
1593        let spill_count = metrics.spill_count().unwrap();
1594        let spilled_rows = metrics.spilled_rows().unwrap();
1595        let spilled_bytes = metrics.spilled_bytes().unwrap();
1596
1597        // This test case is processing 840KB of data using 400KB of memory. Note
1598        // that buffered batches can't be dropped until all sorted batches are
1599        // generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
1600        // batches.
1601        // The number of spills is roughly calculated as:
1602        //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1603
1604        // If this assertion fail with large spill count, make sure the following
1605        // case does not happen:
1606        // During external sorting, one sorted run should be spilled to disk in a
1607        // single file, due to memory limit we might need to append to the file
1608        // multiple times to spill all the data. Make sure we're not writing each
1609        // appending as a separate file.
1610        assert!((4..=8).contains(&spill_count));
1611        assert!((15000..=20000).contains(&spilled_rows));
1612        assert!((900000..=1000000).contains(&spilled_bytes));
1613
1614        // Verify that the result is sorted
1615        let concated_result = concat_batches(&schema, &result)?;
1616        let columns = concated_result.columns();
1617        let string_array = as_string_array(&columns[0]);
1618        for i in 0..string_array.len() - 1 {
1619            assert!(string_array.value(i) <= string_array.value(i + 1));
1620        }
1621
1622        assert_eq!(
1623            task_ctx.runtime_env().memory_pool.reserved(),
1624            0,
1625            "The sort should have returned all memory used back to the memory manager"
1626        );
1627
1628        Ok(())
1629    }
1630
1631    #[tokio::test]
1632    async fn test_sort_fetch_memory_calculation() -> Result<()> {
1633        // This test mirrors down the size from the example above.
1634        let avg_batch_size = 400;
1635        let partitions = 4;
1636
1637        // A tuple of (fetch, expect_spillage)
1638        let test_options = vec![
1639            // Since we don't have a limit (and the memory is less than the total size of
1640            // all the batches we are processing, we expect it to spill.
1641            (None, true),
1642            // When we have a limit however, the buffered size of batches should fit in memory
1643            // since it is much lower than the total size of the input batch.
1644            (Some(1), false),
1645        ];
1646
1647        for (fetch, expect_spillage) in test_options {
1648            let session_config = SessionConfig::new();
1649            let sort_spill_reservation_bytes = session_config
1650                .options()
1651                .execution
1652                .sort_spill_reservation_bytes;
1653
1654            let runtime = RuntimeEnvBuilder::new()
1655                .with_memory_limit(
1656                    sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1657                    1.0,
1658                )
1659                .build_arc()?;
1660            let task_ctx = Arc::new(
1661                TaskContext::default()
1662                    .with_runtime(runtime)
1663                    .with_session_config(session_config),
1664            );
1665
1666            let csv = test::scan_partitioned(partitions);
1667            let schema = csv.schema();
1668
1669            let sort_exec = Arc::new(
1670                SortExec::new(
1671                    LexOrdering::new(vec![PhysicalSortExpr {
1672                        expr: col("i", &schema)?,
1673                        options: SortOptions::default(),
1674                    }]),
1675                    Arc::new(CoalescePartitionsExec::new(csv)),
1676                )
1677                .with_fetch(fetch),
1678            );
1679
1680            let result = collect(
1681                Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1682                Arc::clone(&task_ctx),
1683            )
1684            .await?;
1685            assert_eq!(result.len(), 1);
1686
1687            let metrics = sort_exec.metrics().unwrap();
1688            let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1689            assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1690        }
1691        Ok(())
1692    }
1693
1694    #[tokio::test]
1695    async fn test_sort_metadata() -> Result<()> {
1696        let task_ctx = Arc::new(TaskContext::default());
1697        let field_metadata: HashMap<String, String> =
1698            vec![("foo".to_string(), "bar".to_string())]
1699                .into_iter()
1700                .collect();
1701        let schema_metadata: HashMap<String, String> =
1702            vec![("baz".to_string(), "barf".to_string())]
1703                .into_iter()
1704                .collect();
1705
1706        let mut field = Field::new("field_name", DataType::UInt64, true);
1707        field.set_metadata(field_metadata.clone());
1708        let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1709        let schema = Arc::new(schema);
1710
1711        let data: ArrayRef =
1712            Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1713
1714        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data]).unwrap();
1715        let input =
1716            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
1717                .unwrap();
1718
1719        let sort_exec = Arc::new(SortExec::new(
1720            LexOrdering::new(vec![PhysicalSortExpr {
1721                expr: col("field_name", &schema)?,
1722                options: SortOptions::default(),
1723            }]),
1724            input,
1725        ));
1726
1727        let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1728
1729        let expected_data: ArrayRef =
1730            Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1731        let expected_batch =
1732            RecordBatch::try_new(Arc::clone(&schema), vec![expected_data]).unwrap();
1733
1734        // Data is correct
1735        assert_eq!(&vec![expected_batch], &result);
1736
1737        // explicitly ensure the metadata is present
1738        assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1739        assert_eq!(result[0].schema().metadata(), &schema_metadata);
1740
1741        Ok(())
1742    }
1743
1744    #[tokio::test]
1745    async fn test_lex_sort_by_mixed_types() -> Result<()> {
1746        let task_ctx = Arc::new(TaskContext::default());
1747        let schema = Arc::new(Schema::new(vec![
1748            Field::new("a", DataType::Int32, true),
1749            Field::new(
1750                "b",
1751                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1752                true,
1753            ),
1754        ]));
1755
1756        // define data.
1757        let batch = RecordBatch::try_new(
1758            Arc::clone(&schema),
1759            vec![
1760                Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
1761                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1762                    Some(vec![Some(3)]),
1763                    Some(vec![Some(1)]),
1764                    Some(vec![Some(6), None]),
1765                    Some(vec![Some(5)]),
1766                ])),
1767            ],
1768        )?;
1769
1770        let sort_exec = Arc::new(SortExec::new(
1771            LexOrdering::new(vec![
1772                PhysicalSortExpr {
1773                    expr: col("a", &schema)?,
1774                    options: SortOptions {
1775                        descending: false,
1776                        nulls_first: true,
1777                    },
1778                },
1779                PhysicalSortExpr {
1780                    expr: col("b", &schema)?,
1781                    options: SortOptions {
1782                        descending: true,
1783                        nulls_first: false,
1784                    },
1785                },
1786            ]),
1787            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
1788        ));
1789
1790        assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
1791        assert_eq!(
1792            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1793            *sort_exec.schema().field(1).data_type()
1794        );
1795
1796        let result: Vec<RecordBatch> =
1797            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1798        let metrics = sort_exec.metrics().unwrap();
1799        assert!(metrics.elapsed_compute().unwrap() > 0);
1800        assert_eq!(metrics.output_rows().unwrap(), 4);
1801        assert_eq!(result.len(), 1);
1802
1803        let expected = RecordBatch::try_new(
1804            schema,
1805            vec![
1806                Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
1807                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1808                    Some(vec![Some(1)]),
1809                    Some(vec![Some(6), None]),
1810                    Some(vec![Some(5)]),
1811                    Some(vec![Some(3)]),
1812                ])),
1813            ],
1814        )?;
1815
1816        assert_eq!(expected, result[0]);
1817
1818        Ok(())
1819    }
1820
1821    #[tokio::test]
1822    async fn test_lex_sort_by_float() -> Result<()> {
1823        let task_ctx = Arc::new(TaskContext::default());
1824        let schema = Arc::new(Schema::new(vec![
1825            Field::new("a", DataType::Float32, true),
1826            Field::new("b", DataType::Float64, true),
1827        ]));
1828
1829        // define data.
1830        let batch = RecordBatch::try_new(
1831            Arc::clone(&schema),
1832            vec![
1833                Arc::new(Float32Array::from(vec![
1834                    Some(f32::NAN),
1835                    None,
1836                    None,
1837                    Some(f32::NAN),
1838                    Some(1.0_f32),
1839                    Some(1.0_f32),
1840                    Some(2.0_f32),
1841                    Some(3.0_f32),
1842                ])),
1843                Arc::new(Float64Array::from(vec![
1844                    Some(200.0_f64),
1845                    Some(20.0_f64),
1846                    Some(10.0_f64),
1847                    Some(100.0_f64),
1848                    Some(f64::NAN),
1849                    None,
1850                    None,
1851                    Some(f64::NAN),
1852                ])),
1853            ],
1854        )?;
1855
1856        let sort_exec = Arc::new(SortExec::new(
1857            LexOrdering::new(vec![
1858                PhysicalSortExpr {
1859                    expr: col("a", &schema)?,
1860                    options: SortOptions {
1861                        descending: true,
1862                        nulls_first: true,
1863                    },
1864                },
1865                PhysicalSortExpr {
1866                    expr: col("b", &schema)?,
1867                    options: SortOptions {
1868                        descending: false,
1869                        nulls_first: false,
1870                    },
1871                },
1872            ]),
1873            TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
1874        ));
1875
1876        assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
1877        assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
1878
1879        let result: Vec<RecordBatch> =
1880            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1881        let metrics = sort_exec.metrics().unwrap();
1882        assert!(metrics.elapsed_compute().unwrap() > 0);
1883        assert_eq!(metrics.output_rows().unwrap(), 8);
1884        assert_eq!(result.len(), 1);
1885
1886        let columns = result[0].columns();
1887
1888        assert_eq!(DataType::Float32, *columns[0].data_type());
1889        assert_eq!(DataType::Float64, *columns[1].data_type());
1890
1891        let a = as_primitive_array::<Float32Type>(&columns[0])?;
1892        let b = as_primitive_array::<Float64Type>(&columns[1])?;
1893
1894        // convert result to strings to allow comparing to expected result containing NaN
1895        let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
1896            .map(|i| {
1897                let aval = if a.is_valid(i) {
1898                    Some(a.value(i).to_string())
1899                } else {
1900                    None
1901                };
1902                let bval = if b.is_valid(i) {
1903                    Some(b.value(i).to_string())
1904                } else {
1905                    None
1906                };
1907                (aval, bval)
1908            })
1909            .collect();
1910
1911        let expected: Vec<(Option<String>, Option<String>)> = vec![
1912            (None, Some("10".to_owned())),
1913            (None, Some("20".to_owned())),
1914            (Some("NaN".to_owned()), Some("100".to_owned())),
1915            (Some("NaN".to_owned()), Some("200".to_owned())),
1916            (Some("3".to_owned()), Some("NaN".to_owned())),
1917            (Some("2".to_owned()), None),
1918            (Some("1".to_owned()), Some("NaN".to_owned())),
1919            (Some("1".to_owned()), None),
1920        ];
1921
1922        assert_eq!(expected, result);
1923
1924        Ok(())
1925    }
1926
1927    #[tokio::test]
1928    async fn test_drop_cancel() -> Result<()> {
1929        let task_ctx = Arc::new(TaskContext::default());
1930        let schema =
1931            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
1932
1933        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
1934        let refs = blocking_exec.refs();
1935        let sort_exec = Arc::new(SortExec::new(
1936            LexOrdering::new(vec![PhysicalSortExpr {
1937                expr: col("a", &schema)?,
1938                options: SortOptions::default(),
1939            }]),
1940            blocking_exec,
1941        ));
1942
1943        let fut = collect(sort_exec, Arc::clone(&task_ctx));
1944        let mut fut = fut.boxed();
1945
1946        assert_is_pending(&mut fut);
1947        drop(fut);
1948        assert_strong_count_converges_to_zero(refs).await;
1949
1950        assert_eq!(
1951            task_ctx.runtime_env().memory_pool.reserved(),
1952            0,
1953            "The sort should have returned all memory used back to the memory manager"
1954        );
1955
1956        Ok(())
1957    }
1958
1959    #[test]
1960    fn test_empty_sort_batch() {
1961        let schema = Arc::new(Schema::empty());
1962        let options = RecordBatchOptions::new().with_row_count(Some(1));
1963        let batch =
1964            RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
1965                .unwrap();
1966
1967        let expressions = LexOrdering::new(vec![PhysicalSortExpr {
1968            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
1969            options: SortOptions::default(),
1970        }]);
1971
1972        let result = sort_batch(&batch, expressions.as_ref(), None).unwrap();
1973        assert_eq!(result.num_rows(), 1);
1974    }
1975
1976    #[tokio::test]
1977    async fn topk_unbounded_source() -> Result<()> {
1978        let task_ctx = Arc::new(TaskContext::default());
1979        let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
1980        let source = SortedUnboundedExec {
1981            schema: schema.clone(),
1982            batch_size: 2,
1983            cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
1984        };
1985        let mut plan = SortExec::new(
1986            LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
1987                "c1", 0,
1988            )))]),
1989            Arc::new(source),
1990        );
1991        plan = plan.with_fetch(Some(9));
1992
1993        let batches = collect(Arc::new(plan), task_ctx).await?;
1994        assert_snapshot!(batches_to_string(&batches), @r#"
1995            +----+
1996            | c1 |
1997            +----+
1998            | 0  |
1999            | 1  |
2000            | 2  |
2001            | 3  |
2002            | 4  |
2003            | 5  |
2004            | 6  |
2005            | 7  |
2006            | 8  |
2007            +----+
2008            "#);
2009        Ok(())
2010    }
2011}