Skip to main content

datafusion_physical_plan/
limit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the LIMIT plan
19
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26    DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27    SendableRecordBatchStream, Statistics,
28};
29use crate::execution_plan::{Boundedness, CardinalityEffect};
30use crate::{
31    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
32    check_if_same_properties,
33};
34
35use arrow::datatypes::SchemaRef;
36use arrow::record_batch::RecordBatch;
37use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
38use datafusion_execution::TaskContext;
39
40use datafusion_physical_expr::LexOrdering;
41use futures::stream::{Stream, StreamExt};
42use log::trace;
43
44/// Limit execution plan
45#[derive(Debug, Clone)]
46pub struct GlobalLimitExec {
47    /// Input execution plan
48    input: Arc<dyn ExecutionPlan>,
49    /// Number of rows to skip before fetch
50    skip: usize,
51    /// Maximum number of rows to fetch,
52    /// `None` means fetching all rows
53    fetch: Option<usize>,
54    /// Execution metrics
55    metrics: ExecutionPlanMetricsSet,
56    /// Does the limit have to preserve the order of its input, and if so what is it?
57    /// Some optimizations may reorder the input if no particular sort is required
58    required_ordering: Option<LexOrdering>,
59    cache: Arc<PlanProperties>,
60}
61
62impl GlobalLimitExec {
63    /// Create a new GlobalLimitExec
64    pub fn new(input: Arc<dyn ExecutionPlan>, skip: usize, fetch: Option<usize>) -> Self {
65        let cache = Self::compute_properties(&input);
66        GlobalLimitExec {
67            input,
68            skip,
69            fetch,
70            metrics: ExecutionPlanMetricsSet::new(),
71            required_ordering: None,
72            cache: Arc::new(cache),
73        }
74    }
75
76    /// Input execution plan
77    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
78        &self.input
79    }
80
81    /// Number of rows to skip before fetch
82    pub fn skip(&self) -> usize {
83        self.skip
84    }
85
86    /// Maximum number of rows to fetch
87    pub fn fetch(&self) -> Option<usize> {
88        self.fetch
89    }
90
91    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
92    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
93        PlanProperties::new(
94            input.equivalence_properties().clone(), // Equivalence Properties
95            Partitioning::UnknownPartitioning(1),   // Output Partitioning
96            input.pipeline_behavior(),
97            // Limit operations are always bounded since they output a finite number of rows
98            Boundedness::Bounded,
99        )
100    }
101
102    /// Get the required ordering from limit
103    pub fn required_ordering(&self) -> &Option<LexOrdering> {
104        &self.required_ordering
105    }
106
107    /// Set the required ordering for limit
108    pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
109        self.required_ordering = required_ordering;
110    }
111
112    fn with_new_children_and_same_properties(
113        &self,
114        mut children: Vec<Arc<dyn ExecutionPlan>>,
115    ) -> Self {
116        Self {
117            input: children.swap_remove(0),
118            metrics: ExecutionPlanMetricsSet::new(),
119            ..Self::clone(self)
120        }
121    }
122}
123
124impl DisplayAs for GlobalLimitExec {
125    fn fmt_as(
126        &self,
127        t: DisplayFormatType,
128        f: &mut std::fmt::Formatter,
129    ) -> std::fmt::Result {
130        match t {
131            DisplayFormatType::Default | DisplayFormatType::Verbose => {
132                write!(
133                    f,
134                    "GlobalLimitExec: skip={}, fetch={}",
135                    self.skip,
136                    self.fetch
137                        .map_or_else(|| "None".to_string(), |x| x.to_string())
138                )
139            }
140            DisplayFormatType::TreeRender => {
141                if let Some(fetch) = self.fetch {
142                    writeln!(f, "limit={fetch}")?;
143                }
144                write!(f, "skip={}", self.skip)
145            }
146        }
147    }
148}
149
150impl ExecutionPlan for GlobalLimitExec {
151    fn name(&self) -> &'static str {
152        "GlobalLimitExec"
153    }
154
155    /// Return a reference to Any that can be used for downcasting
156    fn properties(&self) -> &Arc<PlanProperties> {
157        &self.cache
158    }
159
160    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
161        vec![&self.input]
162    }
163
164    fn required_input_distribution(&self) -> Vec<Distribution> {
165        vec![Distribution::SinglePartition]
166    }
167
168    fn maintains_input_order(&self) -> Vec<bool> {
169        vec![true]
170    }
171
172    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
173        vec![false]
174    }
175
176    fn with_new_children(
177        self: Arc<Self>,
178        mut children: Vec<Arc<dyn ExecutionPlan>>,
179    ) -> Result<Arc<dyn ExecutionPlan>> {
180        check_if_same_properties!(self, children);
181        Ok(Arc::new(GlobalLimitExec::new(
182            children.swap_remove(0),
183            self.skip,
184            self.fetch,
185        )))
186    }
187
188    fn execute(
189        &self,
190        partition: usize,
191        context: Arc<TaskContext>,
192    ) -> Result<SendableRecordBatchStream> {
193        trace!("Start GlobalLimitExec::execute for partition: {partition}");
194        // GlobalLimitExec has a single output partition
195        assert_eq_or_internal_err!(
196            partition,
197            0,
198            "GlobalLimitExec invalid partition {partition}"
199        );
200
201        // GlobalLimitExec requires a single input partition
202        assert_eq_or_internal_err!(
203            self.input.output_partitioning().partition_count(),
204            1,
205            "GlobalLimitExec requires a single input partition"
206        );
207
208        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
209        let stream = self.input.execute(0, context)?;
210        Ok(Box::pin(LimitStream::new(
211            stream,
212            self.skip,
213            self.fetch,
214            baseline_metrics,
215        )))
216    }
217
218    fn metrics(&self) -> Option<MetricsSet> {
219        Some(self.metrics.clone_inner())
220    }
221
222    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
223        let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
224        Ok(Arc::new(stats.with_fetch(self.fetch, self.skip, 1)?))
225    }
226
227    fn fetch(&self) -> Option<usize> {
228        self.fetch
229    }
230
231    fn supports_limit_pushdown(&self) -> bool {
232        true
233    }
234}
235
236/// LocalLimitExec applies a limit to a single partition
237#[derive(Debug, Clone)]
238pub struct LocalLimitExec {
239    /// Input execution plan
240    input: Arc<dyn ExecutionPlan>,
241    /// Maximum number of rows to return
242    fetch: usize,
243    /// Execution metrics
244    metrics: ExecutionPlanMetricsSet,
245    /// If the child plan is a sort node, after the sort node is removed during
246    /// physical optimization, we should add the required ordering to the limit node
247    required_ordering: Option<LexOrdering>,
248    cache: Arc<PlanProperties>,
249}
250
251impl LocalLimitExec {
252    /// Create a new LocalLimitExec partition
253    pub fn new(input: Arc<dyn ExecutionPlan>, fetch: usize) -> Self {
254        let cache = Self::compute_properties(&input);
255        Self {
256            input,
257            fetch,
258            metrics: ExecutionPlanMetricsSet::new(),
259            required_ordering: None,
260            cache: Arc::new(cache),
261        }
262    }
263
264    /// Input execution plan
265    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
266        &self.input
267    }
268
269    /// Maximum number of rows to fetch
270    pub fn fetch(&self) -> usize {
271        self.fetch
272    }
273
274    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
275    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
276        PlanProperties::new(
277            input.equivalence_properties().clone(), // Equivalence Properties
278            input.output_partitioning().clone(),    // Output Partitioning
279            input.pipeline_behavior(),
280            // Limit operations are always bounded since they output a finite number of rows
281            Boundedness::Bounded,
282        )
283    }
284
285    /// Get the required ordering from limit
286    pub fn required_ordering(&self) -> &Option<LexOrdering> {
287        &self.required_ordering
288    }
289
290    /// Set the required ordering for limit
291    pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
292        self.required_ordering = required_ordering;
293    }
294
295    fn with_new_children_and_same_properties(
296        &self,
297        mut children: Vec<Arc<dyn ExecutionPlan>>,
298    ) -> Self {
299        Self {
300            input: children.swap_remove(0),
301            metrics: ExecutionPlanMetricsSet::new(),
302            ..Self::clone(self)
303        }
304    }
305}
306
307impl DisplayAs for LocalLimitExec {
308    fn fmt_as(
309        &self,
310        t: DisplayFormatType,
311        f: &mut std::fmt::Formatter,
312    ) -> std::fmt::Result {
313        match t {
314            DisplayFormatType::Default | DisplayFormatType::Verbose => {
315                write!(f, "LocalLimitExec: fetch={}", self.fetch)
316            }
317            DisplayFormatType::TreeRender => {
318                write!(f, "limit={}", self.fetch)
319            }
320        }
321    }
322}
323
324impl ExecutionPlan for LocalLimitExec {
325    fn name(&self) -> &'static str {
326        "LocalLimitExec"
327    }
328
329    /// Return a reference to Any that can be used for downcasting
330    fn properties(&self) -> &Arc<PlanProperties> {
331        &self.cache
332    }
333
334    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
335        vec![&self.input]
336    }
337
338    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
339        vec![false]
340    }
341
342    fn maintains_input_order(&self) -> Vec<bool> {
343        vec![true]
344    }
345
346    fn with_new_children(
347        self: Arc<Self>,
348        children: Vec<Arc<dyn ExecutionPlan>>,
349    ) -> Result<Arc<dyn ExecutionPlan>> {
350        check_if_same_properties!(self, children);
351        match children.len() {
352            1 => Ok(Arc::new(LocalLimitExec::new(
353                Arc::clone(&children[0]),
354                self.fetch,
355            ))),
356            _ => internal_err!("LocalLimitExec wrong number of children"),
357        }
358    }
359
360    fn execute(
361        &self,
362        partition: usize,
363        context: Arc<TaskContext>,
364    ) -> Result<SendableRecordBatchStream> {
365        trace!(
366            "Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}",
367            partition,
368            context.session_id(),
369            context.task_id()
370        );
371        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
372        let stream = self.input.execute(partition, context)?;
373        Ok(Box::pin(LimitStream::new(
374            stream,
375            0,
376            Some(self.fetch),
377            baseline_metrics,
378        )))
379    }
380
381    fn metrics(&self) -> Option<MetricsSet> {
382        Some(self.metrics.clone_inner())
383    }
384
385    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
386        let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
387        Ok(Arc::new(stats.with_fetch(Some(self.fetch), 0, 1)?))
388    }
389
390    fn fetch(&self) -> Option<usize> {
391        Some(self.fetch)
392    }
393
394    fn supports_limit_pushdown(&self) -> bool {
395        true
396    }
397
398    fn cardinality_effect(&self) -> CardinalityEffect {
399        CardinalityEffect::LowerEqual
400    }
401}
402
403/// A Limit stream skips `skip` rows, and then fetch up to `fetch` rows.
404pub struct LimitStream {
405    /// The remaining number of rows to skip
406    skip: usize,
407    /// The remaining number of rows to produce
408    fetch: usize,
409    /// The input to read from. This is set to None once the limit is
410    /// reached to enable early termination
411    input: Option<SendableRecordBatchStream>,
412    /// Copy of the input schema
413    schema: SchemaRef,
414    /// Execution time metrics
415    baseline_metrics: BaselineMetrics,
416}
417
418impl LimitStream {
419    pub fn new(
420        input: SendableRecordBatchStream,
421        skip: usize,
422        fetch: Option<usize>,
423        baseline_metrics: BaselineMetrics,
424    ) -> Self {
425        let schema = input.schema();
426        Self {
427            skip,
428            fetch: fetch.unwrap_or(usize::MAX),
429            input: Some(input),
430            schema,
431            baseline_metrics,
432        }
433    }
434
435    fn poll_and_skip(
436        &mut self,
437        cx: &mut Context<'_>,
438    ) -> Poll<Option<Result<RecordBatch>>> {
439        let input = self.input.as_mut().unwrap();
440        loop {
441            let poll = input.poll_next_unpin(cx);
442            let poll = poll.map_ok(|batch| {
443                if batch.num_rows() <= self.skip {
444                    self.skip -= batch.num_rows();
445                    RecordBatch::new_empty(input.schema())
446                } else {
447                    let new_batch = batch.slice(self.skip, batch.num_rows() - self.skip);
448                    self.skip = 0;
449                    new_batch
450                }
451            });
452
453            match &poll {
454                Poll::Ready(Some(Ok(batch))) => {
455                    if batch.num_rows() > 0 {
456                        break poll;
457                    } else {
458                        // Continue to poll input stream
459                    }
460                }
461                Poll::Ready(Some(Err(_e))) => break poll,
462                Poll::Ready(None) => break poll,
463                Poll::Pending => break poll,
464            }
465        }
466    }
467
468    /// Fetches from the batch
469    fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
470        // records time on drop
471        let _timer = self.baseline_metrics.elapsed_compute().timer();
472        if self.fetch == 0 {
473            self.input = None; // Clear input so it can be dropped early
474            None
475        } else if batch.num_rows() < self.fetch {
476            //
477            self.fetch -= batch.num_rows();
478            Some(batch)
479        } else if batch.num_rows() >= self.fetch {
480            let batch_rows = self.fetch;
481            self.fetch = 0;
482            self.input = None; // Clear input so it can be dropped early
483
484            // It is guaranteed that batch_rows is <= batch.num_rows
485            Some(batch.slice(0, batch_rows))
486        } else {
487            unreachable!()
488        }
489    }
490}
491
492impl Stream for LimitStream {
493    type Item = Result<RecordBatch>;
494
495    fn poll_next(
496        mut self: Pin<&mut Self>,
497        cx: &mut Context<'_>,
498    ) -> Poll<Option<Self::Item>> {
499        let fetch_started = self.skip == 0;
500        let poll = match &mut self.input {
501            Some(input) => {
502                let poll = if fetch_started {
503                    input.poll_next_unpin(cx)
504                } else {
505                    self.poll_and_skip(cx)
506                };
507
508                poll.map(|x| match x {
509                    Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
510                    other => other,
511                })
512            }
513            // Input has been cleared
514            None => Poll::Ready(None),
515        };
516
517        self.baseline_metrics.record_poll(poll)
518    }
519}
520
521impl RecordBatchStream for LimitStream {
522    /// Get the schema
523    fn schema(&self) -> SchemaRef {
524        Arc::clone(&self.schema)
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use crate::coalesce_partitions::CoalescePartitionsExec;
532    use crate::common::collect;
533    use crate::test;
534
535    use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
536    use arrow::array::RecordBatchOptions;
537    use arrow::datatypes::Schema;
538    use datafusion_common::stats::Precision;
539    use datafusion_physical_expr::PhysicalExpr;
540    use datafusion_physical_expr::expressions::col;
541
542    #[tokio::test]
543    async fn limit() -> Result<()> {
544        let task_ctx = Arc::new(TaskContext::default());
545
546        let num_partitions = 4;
547        let csv = test::scan_partitioned(num_partitions);
548
549        // Input should have 4 partitions
550        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
551
552        let limit =
553            GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 0, Some(7));
554
555        // The result should contain 4 batches (one per input partition)
556        let iter = limit.execute(0, task_ctx)?;
557        let batches = collect(iter).await?;
558
559        // There should be a total of 100 rows
560        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
561        assert_eq!(row_count, 7);
562
563        Ok(())
564    }
565
566    #[tokio::test]
567    async fn limit_early_shutdown() -> Result<()> {
568        let batches = vec![
569            test::make_partition(5),
570            test::make_partition(10),
571            test::make_partition(15),
572            test::make_partition(20),
573            test::make_partition(25),
574        ];
575        let input = test::exec::TestStream::new(batches);
576
577        let index = input.index();
578        assert_eq!(index.value(), 0);
579
580        // Limit of six needs to consume the entire first record batch
581        // (5 rows) and 1 row from the second (1 row)
582        let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
583        let limit_stream =
584            LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
585        assert_eq!(index.value(), 0);
586
587        let results = collect(Box::pin(limit_stream)).await.unwrap();
588        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
589        // Only 6 rows should have been produced
590        assert_eq!(num_rows, 6);
591
592        // Only the first two batches should be consumed
593        assert_eq!(index.value(), 2);
594
595        Ok(())
596    }
597
598    #[tokio::test]
599    async fn limit_equals_batch_size() -> Result<()> {
600        let batches = vec![
601            test::make_partition(6),
602            test::make_partition(6),
603            test::make_partition(6),
604        ];
605        let input = test::exec::TestStream::new(batches);
606
607        let index = input.index();
608        assert_eq!(index.value(), 0);
609
610        // Limit of six needs to consume the entire first record batch
611        // (6 rows) and stop immediately
612        let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
613        let limit_stream =
614            LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
615        assert_eq!(index.value(), 0);
616
617        let results = collect(Box::pin(limit_stream)).await.unwrap();
618        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
619        // Only 6 rows should have been produced
620        assert_eq!(num_rows, 6);
621
622        // Only the first batch should be consumed
623        assert_eq!(index.value(), 1);
624
625        Ok(())
626    }
627
628    #[tokio::test]
629    async fn limit_no_column() -> Result<()> {
630        let batches = vec![
631            make_batch_no_column(6),
632            make_batch_no_column(6),
633            make_batch_no_column(6),
634        ];
635        let input = test::exec::TestStream::new(batches);
636
637        let index = input.index();
638        assert_eq!(index.value(), 0);
639
640        // Limit of six needs to consume the entire first record batch
641        // (6 rows) and stop immediately
642        let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
643        let limit_stream =
644            LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
645        assert_eq!(index.value(), 0);
646
647        let results = collect(Box::pin(limit_stream)).await.unwrap();
648        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
649        // Only 6 rows should have been produced
650        assert_eq!(num_rows, 6);
651
652        // Only the first batch should be consumed
653        assert_eq!(index.value(), 1);
654
655        Ok(())
656    }
657
658    // Test cases for "skip"
659    async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
660        let task_ctx = Arc::new(TaskContext::default());
661
662        // 4 partitions @ 100 rows apiece
663        let num_partitions = 4;
664        let csv = test::scan_partitioned(num_partitions);
665
666        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
667
668        let offset =
669            GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);
670
671        // The result should contain 4 batches (one per input partition)
672        let iter = offset.execute(0, task_ctx)?;
673        let batches = collect(iter).await?;
674        Ok(batches.iter().map(|batch| batch.num_rows()).sum())
675    }
676
677    #[tokio::test]
678    async fn skip_none_fetch_none() -> Result<()> {
679        let row_count = skip_and_fetch(0, None).await?;
680        assert_eq!(row_count, 400);
681        Ok(())
682    }
683
684    #[tokio::test]
685    async fn skip_none_fetch_50() -> Result<()> {
686        let row_count = skip_and_fetch(0, Some(50)).await?;
687        assert_eq!(row_count, 50);
688        Ok(())
689    }
690
691    #[tokio::test]
692    async fn skip_3_fetch_none() -> Result<()> {
693        // There are total of 400 rows, we skipped 3 rows (offset = 3)
694        let row_count = skip_and_fetch(3, None).await?;
695        assert_eq!(row_count, 397);
696        Ok(())
697    }
698
699    #[tokio::test]
700    async fn skip_3_fetch_10_stats() -> Result<()> {
701        // There are total of 100 rows, we skipped 3 rows (offset = 3)
702        let row_count = skip_and_fetch(3, Some(10)).await?;
703        assert_eq!(row_count, 10);
704        Ok(())
705    }
706
707    #[tokio::test]
708    async fn skip_400_fetch_none() -> Result<()> {
709        let row_count = skip_and_fetch(400, None).await?;
710        assert_eq!(row_count, 0);
711        Ok(())
712    }
713
714    #[tokio::test]
715    async fn skip_400_fetch_1() -> Result<()> {
716        // There are a total of 400 rows
717        let row_count = skip_and_fetch(400, Some(1)).await?;
718        assert_eq!(row_count, 0);
719        Ok(())
720    }
721
722    #[tokio::test]
723    async fn skip_401_fetch_none() -> Result<()> {
724        // There are total of 400 rows, we skipped 401 rows (offset = 3)
725        let row_count = skip_and_fetch(401, None).await?;
726        assert_eq!(row_count, 0);
727        Ok(())
728    }
729
730    #[tokio::test]
731    async fn test_row_number_statistics_for_global_limit() -> Result<()> {
732        let row_count = row_number_statistics_for_global_limit(0, Some(10)).await?;
733        assert_eq!(row_count, Precision::Exact(10));
734
735        let row_count = row_number_statistics_for_global_limit(5, Some(10)).await?;
736        assert_eq!(row_count, Precision::Exact(10));
737
738        let row_count = row_number_statistics_for_global_limit(400, Some(10)).await?;
739        assert_eq!(row_count, Precision::Exact(0));
740
741        let row_count = row_number_statistics_for_global_limit(398, Some(10)).await?;
742        assert_eq!(row_count, Precision::Exact(2));
743
744        let row_count = row_number_statistics_for_global_limit(398, Some(1)).await?;
745        assert_eq!(row_count, Precision::Exact(1));
746
747        let row_count = row_number_statistics_for_global_limit(398, None).await?;
748        assert_eq!(row_count, Precision::Exact(2));
749
750        let row_count =
751            row_number_statistics_for_global_limit(0, Some(usize::MAX)).await?;
752        assert_eq!(row_count, Precision::Exact(400));
753
754        let row_count =
755            row_number_statistics_for_global_limit(398, Some(usize::MAX)).await?;
756        assert_eq!(row_count, Precision::Exact(2));
757
758        let row_count =
759            row_number_inexact_statistics_for_global_limit(0, Some(10)).await?;
760        assert_eq!(row_count, Precision::Inexact(10));
761
762        let row_count =
763            row_number_inexact_statistics_for_global_limit(5, Some(10)).await?;
764        assert_eq!(row_count, Precision::Inexact(10));
765
766        // Input was Inexact, so an `nr <= skip` outcome must remain Inexact:
767        // the inexact estimate could be wrong, so we cannot promote 0 to
768        // Exact.
769        let row_count =
770            row_number_inexact_statistics_for_global_limit(400, Some(10)).await?;
771        assert_eq!(row_count, Precision::Inexact(0));
772
773        let row_count =
774            row_number_inexact_statistics_for_global_limit(398, Some(10)).await?;
775        assert_eq!(row_count, Precision::Inexact(2));
776
777        let row_count =
778            row_number_inexact_statistics_for_global_limit(398, Some(1)).await?;
779        assert_eq!(row_count, Precision::Inexact(1));
780
781        let row_count = row_number_inexact_statistics_for_global_limit(398, None).await?;
782        assert_eq!(row_count, Precision::Inexact(2));
783
784        let row_count =
785            row_number_inexact_statistics_for_global_limit(0, Some(usize::MAX)).await?;
786        assert_eq!(row_count, Precision::Inexact(400));
787
788        let row_count =
789            row_number_inexact_statistics_for_global_limit(398, Some(usize::MAX)).await?;
790        assert_eq!(row_count, Precision::Inexact(2));
791
792        Ok(())
793    }
794
795    #[tokio::test]
796    async fn test_row_number_statistics_for_local_limit() -> Result<()> {
797        let row_count = row_number_statistics_for_local_limit(4, 10).await?;
798        assert_eq!(row_count, Precision::Exact(10));
799
800        Ok(())
801    }
802
803    async fn row_number_statistics_for_global_limit(
804        skip: usize,
805        fetch: Option<usize>,
806    ) -> Result<Precision<usize>> {
807        let num_partitions = 4;
808        let csv = test::scan_partitioned(num_partitions);
809
810        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
811
812        let offset =
813            GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);
814
815        Ok(offset.partition_statistics(None)?.num_rows)
816    }
817
818    pub fn build_group_by(
819        input_schema: &SchemaRef,
820        columns: Vec<String>,
821    ) -> PhysicalGroupBy {
822        let mut group_by_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
823        for column in columns.iter() {
824            group_by_expr.push((col(column, input_schema).unwrap(), column.to_string()));
825        }
826        PhysicalGroupBy::new_single(group_by_expr.clone())
827    }
828
829    async fn row_number_inexact_statistics_for_global_limit(
830        skip: usize,
831        fetch: Option<usize>,
832    ) -> Result<Precision<usize>> {
833        let num_partitions = 4;
834        let csv = test::scan_partitioned(num_partitions);
835
836        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
837
838        // Adding a "GROUP BY i" changes the input stats from Exact to Inexact.
839        let agg = AggregateExec::try_new(
840            AggregateMode::Final,
841            build_group_by(&csv.schema(), vec!["i".to_string()]),
842            vec![],
843            vec![],
844            Arc::clone(&csv),
845            Arc::clone(&csv.schema()),
846        )?;
847        let agg_exec: Arc<dyn ExecutionPlan> = Arc::new(agg);
848
849        let offset = GlobalLimitExec::new(
850            Arc::new(CoalescePartitionsExec::new(agg_exec)),
851            skip,
852            fetch,
853        );
854
855        Ok(offset.partition_statistics(None)?.num_rows)
856    }
857
858    async fn row_number_statistics_for_local_limit(
859        num_partitions: usize,
860        fetch: usize,
861    ) -> Result<Precision<usize>> {
862        let csv = test::scan_partitioned(num_partitions);
863
864        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
865
866        let offset = LocalLimitExec::new(csv, fetch);
867
868        Ok(offset.partition_statistics(None)?.num_rows)
869    }
870
871    /// Return a RecordBatch with a single array with row_count sz
872    fn make_batch_no_column(sz: usize) -> RecordBatch {
873        let schema = Arc::new(Schema::empty());
874
875        let options = RecordBatchOptions::new().with_row_count(Option::from(sz));
876        RecordBatch::try_new_with_options(schema, vec![], &options).unwrap()
877    }
878}