Skip to main content

datafusion_physical_plan/
limit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the LIMIT plan
19
20use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{
27    DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
28    SendableRecordBatchStream, Statistics,
29};
30use crate::execution_plan::{Boundedness, CardinalityEffect};
31use crate::{
32    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
33    check_if_same_properties,
34};
35
36use arrow::datatypes::SchemaRef;
37use arrow::record_batch::RecordBatch;
38use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
39use datafusion_execution::TaskContext;
40
41use datafusion_physical_expr::LexOrdering;
42use futures::stream::{Stream, StreamExt};
43use log::trace;
44
45/// Limit execution plan
46#[derive(Debug, Clone)]
47pub struct GlobalLimitExec {
48    /// Input execution plan
49    input: Arc<dyn ExecutionPlan>,
50    /// Number of rows to skip before fetch
51    skip: usize,
52    /// Maximum number of rows to fetch,
53    /// `None` means fetching all rows
54    fetch: Option<usize>,
55    /// Execution metrics
56    metrics: ExecutionPlanMetricsSet,
57    /// Does the limit have to preserve the order of its input, and if so what is it?
58    /// Some optimizations may reorder the input if no particular sort is required
59    required_ordering: Option<LexOrdering>,
60    cache: Arc<PlanProperties>,
61}
62
63impl GlobalLimitExec {
64    /// Create a new GlobalLimitExec
65    pub fn new(input: Arc<dyn ExecutionPlan>, skip: usize, fetch: Option<usize>) -> Self {
66        let cache = Self::compute_properties(&input);
67        GlobalLimitExec {
68            input,
69            skip,
70            fetch,
71            metrics: ExecutionPlanMetricsSet::new(),
72            required_ordering: None,
73            cache: Arc::new(cache),
74        }
75    }
76
77    /// Input execution plan
78    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
79        &self.input
80    }
81
82    /// Number of rows to skip before fetch
83    pub fn skip(&self) -> usize {
84        self.skip
85    }
86
87    /// Maximum number of rows to fetch
88    pub fn fetch(&self) -> Option<usize> {
89        self.fetch
90    }
91
92    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
93    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
94        PlanProperties::new(
95            input.equivalence_properties().clone(), // Equivalence Properties
96            Partitioning::UnknownPartitioning(1),   // Output Partitioning
97            input.pipeline_behavior(),
98            // Limit operations are always bounded since they output a finite number of rows
99            Boundedness::Bounded,
100        )
101    }
102
103    /// Get the required ordering from limit
104    pub fn required_ordering(&self) -> &Option<LexOrdering> {
105        &self.required_ordering
106    }
107
108    /// Set the required ordering for limit
109    pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
110        self.required_ordering = required_ordering;
111    }
112
113    fn with_new_children_and_same_properties(
114        &self,
115        mut children: Vec<Arc<dyn ExecutionPlan>>,
116    ) -> Self {
117        Self {
118            input: children.swap_remove(0),
119            metrics: ExecutionPlanMetricsSet::new(),
120            ..Self::clone(self)
121        }
122    }
123}
124
125impl DisplayAs for GlobalLimitExec {
126    fn fmt_as(
127        &self,
128        t: DisplayFormatType,
129        f: &mut std::fmt::Formatter,
130    ) -> std::fmt::Result {
131        match t {
132            DisplayFormatType::Default | DisplayFormatType::Verbose => {
133                write!(
134                    f,
135                    "GlobalLimitExec: skip={}, fetch={}",
136                    self.skip,
137                    self.fetch
138                        .map_or_else(|| "None".to_string(), |x| x.to_string())
139                )
140            }
141            DisplayFormatType::TreeRender => {
142                if let Some(fetch) = self.fetch {
143                    writeln!(f, "limit={fetch}")?;
144                }
145                write!(f, "skip={}", self.skip)
146            }
147        }
148    }
149}
150
151impl ExecutionPlan for GlobalLimitExec {
152    fn name(&self) -> &'static str {
153        "GlobalLimitExec"
154    }
155
156    /// Return a reference to Any that can be used for downcasting
157    fn as_any(&self) -> &dyn Any {
158        self
159    }
160
161    fn properties(&self) -> &Arc<PlanProperties> {
162        &self.cache
163    }
164
165    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
166        vec![&self.input]
167    }
168
169    fn required_input_distribution(&self) -> Vec<Distribution> {
170        vec![Distribution::SinglePartition]
171    }
172
173    fn maintains_input_order(&self) -> Vec<bool> {
174        vec![true]
175    }
176
177    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
178        vec![false]
179    }
180
181    fn with_new_children(
182        self: Arc<Self>,
183        mut children: Vec<Arc<dyn ExecutionPlan>>,
184    ) -> Result<Arc<dyn ExecutionPlan>> {
185        check_if_same_properties!(self, children);
186        Ok(Arc::new(GlobalLimitExec::new(
187            children.swap_remove(0),
188            self.skip,
189            self.fetch,
190        )))
191    }
192
193    fn execute(
194        &self,
195        partition: usize,
196        context: Arc<TaskContext>,
197    ) -> Result<SendableRecordBatchStream> {
198        trace!("Start GlobalLimitExec::execute for partition: {partition}");
199        // GlobalLimitExec has a single output partition
200        assert_eq_or_internal_err!(
201            partition,
202            0,
203            "GlobalLimitExec invalid partition {partition}"
204        );
205
206        // GlobalLimitExec requires a single input partition
207        assert_eq_or_internal_err!(
208            self.input.output_partitioning().partition_count(),
209            1,
210            "GlobalLimitExec requires a single input partition"
211        );
212
213        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
214        let stream = self.input.execute(0, context)?;
215        Ok(Box::pin(LimitStream::new(
216            stream,
217            self.skip,
218            self.fetch,
219            baseline_metrics,
220        )))
221    }
222
223    fn metrics(&self) -> Option<MetricsSet> {
224        Some(self.metrics.clone_inner())
225    }
226
227    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
228        self.input
229            .partition_statistics(partition)?
230            .with_fetch(self.fetch, self.skip, 1)
231    }
232
233    fn fetch(&self) -> Option<usize> {
234        self.fetch
235    }
236
237    fn supports_limit_pushdown(&self) -> bool {
238        true
239    }
240}
241
242/// LocalLimitExec applies a limit to a single partition
243#[derive(Debug, Clone)]
244pub struct LocalLimitExec {
245    /// Input execution plan
246    input: Arc<dyn ExecutionPlan>,
247    /// Maximum number of rows to return
248    fetch: usize,
249    /// Execution metrics
250    metrics: ExecutionPlanMetricsSet,
251    /// If the child plan is a sort node, after the sort node is removed during
252    /// physical optimization, we should add the required ordering to the limit node
253    required_ordering: Option<LexOrdering>,
254    cache: Arc<PlanProperties>,
255}
256
257impl LocalLimitExec {
258    /// Create a new LocalLimitExec partition
259    pub fn new(input: Arc<dyn ExecutionPlan>, fetch: usize) -> Self {
260        let cache = Self::compute_properties(&input);
261        Self {
262            input,
263            fetch,
264            metrics: ExecutionPlanMetricsSet::new(),
265            required_ordering: None,
266            cache: Arc::new(cache),
267        }
268    }
269
270    /// Input execution plan
271    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
272        &self.input
273    }
274
275    /// Maximum number of rows to fetch
276    pub fn fetch(&self) -> usize {
277        self.fetch
278    }
279
280    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
281    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
282        PlanProperties::new(
283            input.equivalence_properties().clone(), // Equivalence Properties
284            input.output_partitioning().clone(),    // Output Partitioning
285            input.pipeline_behavior(),
286            // Limit operations are always bounded since they output a finite number of rows
287            Boundedness::Bounded,
288        )
289    }
290
291    /// Get the required ordering from limit
292    pub fn required_ordering(&self) -> &Option<LexOrdering> {
293        &self.required_ordering
294    }
295
296    /// Set the required ordering for limit
297    pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
298        self.required_ordering = required_ordering;
299    }
300
301    fn with_new_children_and_same_properties(
302        &self,
303        mut children: Vec<Arc<dyn ExecutionPlan>>,
304    ) -> Self {
305        Self {
306            input: children.swap_remove(0),
307            metrics: ExecutionPlanMetricsSet::new(),
308            ..Self::clone(self)
309        }
310    }
311}
312
313impl DisplayAs for LocalLimitExec {
314    fn fmt_as(
315        &self,
316        t: DisplayFormatType,
317        f: &mut std::fmt::Formatter,
318    ) -> std::fmt::Result {
319        match t {
320            DisplayFormatType::Default | DisplayFormatType::Verbose => {
321                write!(f, "LocalLimitExec: fetch={}", self.fetch)
322            }
323            DisplayFormatType::TreeRender => {
324                write!(f, "limit={}", self.fetch)
325            }
326        }
327    }
328}
329
330impl ExecutionPlan for LocalLimitExec {
331    fn name(&self) -> &'static str {
332        "LocalLimitExec"
333    }
334
335    /// Return a reference to Any that can be used for downcasting
336    fn as_any(&self) -> &dyn Any {
337        self
338    }
339
340    fn properties(&self) -> &Arc<PlanProperties> {
341        &self.cache
342    }
343
344    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
345        vec![&self.input]
346    }
347
348    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
349        vec![false]
350    }
351
352    fn maintains_input_order(&self) -> Vec<bool> {
353        vec![true]
354    }
355
356    fn with_new_children(
357        self: Arc<Self>,
358        children: Vec<Arc<dyn ExecutionPlan>>,
359    ) -> Result<Arc<dyn ExecutionPlan>> {
360        check_if_same_properties!(self, children);
361        match children.len() {
362            1 => Ok(Arc::new(LocalLimitExec::new(
363                Arc::clone(&children[0]),
364                self.fetch,
365            ))),
366            _ => internal_err!("LocalLimitExec wrong number of children"),
367        }
368    }
369
370    fn execute(
371        &self,
372        partition: usize,
373        context: Arc<TaskContext>,
374    ) -> Result<SendableRecordBatchStream> {
375        trace!(
376            "Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}",
377            partition,
378            context.session_id(),
379            context.task_id()
380        );
381        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
382        let stream = self.input.execute(partition, context)?;
383        Ok(Box::pin(LimitStream::new(
384            stream,
385            0,
386            Some(self.fetch),
387            baseline_metrics,
388        )))
389    }
390
391    fn metrics(&self) -> Option<MetricsSet> {
392        Some(self.metrics.clone_inner())
393    }
394
395    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
396        self.input
397            .partition_statistics(partition)?
398            .with_fetch(Some(self.fetch), 0, 1)
399    }
400
401    fn fetch(&self) -> Option<usize> {
402        Some(self.fetch)
403    }
404
405    fn supports_limit_pushdown(&self) -> bool {
406        true
407    }
408
409    fn cardinality_effect(&self) -> CardinalityEffect {
410        CardinalityEffect::LowerEqual
411    }
412}
413
414/// A Limit stream skips `skip` rows, and then fetch up to `fetch` rows.
415pub struct LimitStream {
416    /// The remaining number of rows to skip
417    skip: usize,
418    /// The remaining number of rows to produce
419    fetch: usize,
420    /// The input to read from. This is set to None once the limit is
421    /// reached to enable early termination
422    input: Option<SendableRecordBatchStream>,
423    /// Copy of the input schema
424    schema: SchemaRef,
425    /// Execution time metrics
426    baseline_metrics: BaselineMetrics,
427}
428
429impl LimitStream {
430    pub fn new(
431        input: SendableRecordBatchStream,
432        skip: usize,
433        fetch: Option<usize>,
434        baseline_metrics: BaselineMetrics,
435    ) -> Self {
436        let schema = input.schema();
437        Self {
438            skip,
439            fetch: fetch.unwrap_or(usize::MAX),
440            input: Some(input),
441            schema,
442            baseline_metrics,
443        }
444    }
445
446    fn poll_and_skip(
447        &mut self,
448        cx: &mut Context<'_>,
449    ) -> Poll<Option<Result<RecordBatch>>> {
450        let input = self.input.as_mut().unwrap();
451        loop {
452            let poll = input.poll_next_unpin(cx);
453            let poll = poll.map_ok(|batch| {
454                if batch.num_rows() <= self.skip {
455                    self.skip -= batch.num_rows();
456                    RecordBatch::new_empty(input.schema())
457                } else {
458                    let new_batch = batch.slice(self.skip, batch.num_rows() - self.skip);
459                    self.skip = 0;
460                    new_batch
461                }
462            });
463
464            match &poll {
465                Poll::Ready(Some(Ok(batch))) => {
466                    if batch.num_rows() > 0 {
467                        break poll;
468                    } else {
469                        // Continue to poll input stream
470                    }
471                }
472                Poll::Ready(Some(Err(_e))) => break poll,
473                Poll::Ready(None) => break poll,
474                Poll::Pending => break poll,
475            }
476        }
477    }
478
479    /// Fetches from the batch
480    fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
481        // records time on drop
482        let _timer = self.baseline_metrics.elapsed_compute().timer();
483        if self.fetch == 0 {
484            self.input = None; // Clear input so it can be dropped early
485            None
486        } else if batch.num_rows() < self.fetch {
487            //
488            self.fetch -= batch.num_rows();
489            Some(batch)
490        } else if batch.num_rows() >= self.fetch {
491            let batch_rows = self.fetch;
492            self.fetch = 0;
493            self.input = None; // Clear input so it can be dropped early
494
495            // It is guaranteed that batch_rows is <= batch.num_rows
496            Some(batch.slice(0, batch_rows))
497        } else {
498            unreachable!()
499        }
500    }
501}
502
503impl Stream for LimitStream {
504    type Item = Result<RecordBatch>;
505
506    fn poll_next(
507        mut self: Pin<&mut Self>,
508        cx: &mut Context<'_>,
509    ) -> Poll<Option<Self::Item>> {
510        let fetch_started = self.skip == 0;
511        let poll = match &mut self.input {
512            Some(input) => {
513                let poll = if fetch_started {
514                    input.poll_next_unpin(cx)
515                } else {
516                    self.poll_and_skip(cx)
517                };
518
519                poll.map(|x| match x {
520                    Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
521                    other => other,
522                })
523            }
524            // Input has been cleared
525            None => Poll::Ready(None),
526        };
527
528        self.baseline_metrics.record_poll(poll)
529    }
530}
531
532impl RecordBatchStream for LimitStream {
533    /// Get the schema
534    fn schema(&self) -> SchemaRef {
535        Arc::clone(&self.schema)
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542    use crate::coalesce_partitions::CoalescePartitionsExec;
543    use crate::common::collect;
544    use crate::test;
545
546    use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
547    use arrow::array::RecordBatchOptions;
548    use arrow::datatypes::Schema;
549    use datafusion_common::stats::Precision;
550    use datafusion_physical_expr::PhysicalExpr;
551    use datafusion_physical_expr::expressions::col;
552
553    #[tokio::test]
554    async fn limit() -> Result<()> {
555        let task_ctx = Arc::new(TaskContext::default());
556
557        let num_partitions = 4;
558        let csv = test::scan_partitioned(num_partitions);
559
560        // Input should have 4 partitions
561        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
562
563        let limit =
564            GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 0, Some(7));
565
566        // The result should contain 4 batches (one per input partition)
567        let iter = limit.execute(0, task_ctx)?;
568        let batches = collect(iter).await?;
569
570        // There should be a total of 100 rows
571        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
572        assert_eq!(row_count, 7);
573
574        Ok(())
575    }
576
577    #[tokio::test]
578    async fn limit_early_shutdown() -> Result<()> {
579        let batches = vec![
580            test::make_partition(5),
581            test::make_partition(10),
582            test::make_partition(15),
583            test::make_partition(20),
584            test::make_partition(25),
585        ];
586        let input = test::exec::TestStream::new(batches);
587
588        let index = input.index();
589        assert_eq!(index.value(), 0);
590
591        // Limit of six needs to consume the entire first record batch
592        // (5 rows) and 1 row from the second (1 row)
593        let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
594        let limit_stream =
595            LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
596        assert_eq!(index.value(), 0);
597
598        let results = collect(Box::pin(limit_stream)).await.unwrap();
599        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
600        // Only 6 rows should have been produced
601        assert_eq!(num_rows, 6);
602
603        // Only the first two batches should be consumed
604        assert_eq!(index.value(), 2);
605
606        Ok(())
607    }
608
609    #[tokio::test]
610    async fn limit_equals_batch_size() -> Result<()> {
611        let batches = vec![
612            test::make_partition(6),
613            test::make_partition(6),
614            test::make_partition(6),
615        ];
616        let input = test::exec::TestStream::new(batches);
617
618        let index = input.index();
619        assert_eq!(index.value(), 0);
620
621        // Limit of six needs to consume the entire first record batch
622        // (6 rows) and stop immediately
623        let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
624        let limit_stream =
625            LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
626        assert_eq!(index.value(), 0);
627
628        let results = collect(Box::pin(limit_stream)).await.unwrap();
629        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
630        // Only 6 rows should have been produced
631        assert_eq!(num_rows, 6);
632
633        // Only the first batch should be consumed
634        assert_eq!(index.value(), 1);
635
636        Ok(())
637    }
638
639    #[tokio::test]
640    async fn limit_no_column() -> Result<()> {
641        let batches = vec![
642            make_batch_no_column(6),
643            make_batch_no_column(6),
644            make_batch_no_column(6),
645        ];
646        let input = test::exec::TestStream::new(batches);
647
648        let index = input.index();
649        assert_eq!(index.value(), 0);
650
651        // Limit of six needs to consume the entire first record batch
652        // (6 rows) and stop immediately
653        let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
654        let limit_stream =
655            LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
656        assert_eq!(index.value(), 0);
657
658        let results = collect(Box::pin(limit_stream)).await.unwrap();
659        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
660        // Only 6 rows should have been produced
661        assert_eq!(num_rows, 6);
662
663        // Only the first batch should be consumed
664        assert_eq!(index.value(), 1);
665
666        Ok(())
667    }
668
669    // Test cases for "skip"
670    async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
671        let task_ctx = Arc::new(TaskContext::default());
672
673        // 4 partitions @ 100 rows apiece
674        let num_partitions = 4;
675        let csv = test::scan_partitioned(num_partitions);
676
677        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
678
679        let offset =
680            GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);
681
682        // The result should contain 4 batches (one per input partition)
683        let iter = offset.execute(0, task_ctx)?;
684        let batches = collect(iter).await?;
685        Ok(batches.iter().map(|batch| batch.num_rows()).sum())
686    }
687
688    #[tokio::test]
689    async fn skip_none_fetch_none() -> Result<()> {
690        let row_count = skip_and_fetch(0, None).await?;
691        assert_eq!(row_count, 400);
692        Ok(())
693    }
694
695    #[tokio::test]
696    async fn skip_none_fetch_50() -> Result<()> {
697        let row_count = skip_and_fetch(0, Some(50)).await?;
698        assert_eq!(row_count, 50);
699        Ok(())
700    }
701
702    #[tokio::test]
703    async fn skip_3_fetch_none() -> Result<()> {
704        // There are total of 400 rows, we skipped 3 rows (offset = 3)
705        let row_count = skip_and_fetch(3, None).await?;
706        assert_eq!(row_count, 397);
707        Ok(())
708    }
709
710    #[tokio::test]
711    async fn skip_3_fetch_10_stats() -> Result<()> {
712        // There are total of 100 rows, we skipped 3 rows (offset = 3)
713        let row_count = skip_and_fetch(3, Some(10)).await?;
714        assert_eq!(row_count, 10);
715        Ok(())
716    }
717
718    #[tokio::test]
719    async fn skip_400_fetch_none() -> Result<()> {
720        let row_count = skip_and_fetch(400, None).await?;
721        assert_eq!(row_count, 0);
722        Ok(())
723    }
724
725    #[tokio::test]
726    async fn skip_400_fetch_1() -> Result<()> {
727        // There are a total of 400 rows
728        let row_count = skip_and_fetch(400, Some(1)).await?;
729        assert_eq!(row_count, 0);
730        Ok(())
731    }
732
733    #[tokio::test]
734    async fn skip_401_fetch_none() -> Result<()> {
735        // There are total of 400 rows, we skipped 401 rows (offset = 3)
736        let row_count = skip_and_fetch(401, None).await?;
737        assert_eq!(row_count, 0);
738        Ok(())
739    }
740
741    #[tokio::test]
742    async fn test_row_number_statistics_for_global_limit() -> Result<()> {
743        let row_count = row_number_statistics_for_global_limit(0, Some(10)).await?;
744        assert_eq!(row_count, Precision::Exact(10));
745
746        let row_count = row_number_statistics_for_global_limit(5, Some(10)).await?;
747        assert_eq!(row_count, Precision::Exact(10));
748
749        let row_count = row_number_statistics_for_global_limit(400, Some(10)).await?;
750        assert_eq!(row_count, Precision::Exact(0));
751
752        let row_count = row_number_statistics_for_global_limit(398, Some(10)).await?;
753        assert_eq!(row_count, Precision::Exact(2));
754
755        let row_count = row_number_statistics_for_global_limit(398, Some(1)).await?;
756        assert_eq!(row_count, Precision::Exact(1));
757
758        let row_count = row_number_statistics_for_global_limit(398, None).await?;
759        assert_eq!(row_count, Precision::Exact(2));
760
761        let row_count =
762            row_number_statistics_for_global_limit(0, Some(usize::MAX)).await?;
763        assert_eq!(row_count, Precision::Exact(400));
764
765        let row_count =
766            row_number_statistics_for_global_limit(398, Some(usize::MAX)).await?;
767        assert_eq!(row_count, Precision::Exact(2));
768
769        let row_count =
770            row_number_inexact_statistics_for_global_limit(0, Some(10)).await?;
771        assert_eq!(row_count, Precision::Inexact(10));
772
773        let row_count =
774            row_number_inexact_statistics_for_global_limit(5, Some(10)).await?;
775        assert_eq!(row_count, Precision::Inexact(10));
776
777        let row_count =
778            row_number_inexact_statistics_for_global_limit(400, Some(10)).await?;
779        assert_eq!(row_count, Precision::Exact(0));
780
781        let row_count =
782            row_number_inexact_statistics_for_global_limit(398, Some(10)).await?;
783        assert_eq!(row_count, Precision::Inexact(2));
784
785        let row_count =
786            row_number_inexact_statistics_for_global_limit(398, Some(1)).await?;
787        assert_eq!(row_count, Precision::Inexact(1));
788
789        let row_count = row_number_inexact_statistics_for_global_limit(398, None).await?;
790        assert_eq!(row_count, Precision::Inexact(2));
791
792        let row_count =
793            row_number_inexact_statistics_for_global_limit(0, Some(usize::MAX)).await?;
794        assert_eq!(row_count, Precision::Inexact(400));
795
796        let row_count =
797            row_number_inexact_statistics_for_global_limit(398, Some(usize::MAX)).await?;
798        assert_eq!(row_count, Precision::Inexact(2));
799
800        Ok(())
801    }
802
803    #[tokio::test]
804    async fn test_row_number_statistics_for_local_limit() -> Result<()> {
805        let row_count = row_number_statistics_for_local_limit(4, 10).await?;
806        assert_eq!(row_count, Precision::Exact(10));
807
808        Ok(())
809    }
810
811    async fn row_number_statistics_for_global_limit(
812        skip: usize,
813        fetch: Option<usize>,
814    ) -> Result<Precision<usize>> {
815        let num_partitions = 4;
816        let csv = test::scan_partitioned(num_partitions);
817
818        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
819
820        let offset =
821            GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);
822
823        Ok(offset.partition_statistics(None)?.num_rows)
824    }
825
826    pub fn build_group_by(
827        input_schema: &SchemaRef,
828        columns: Vec<String>,
829    ) -> PhysicalGroupBy {
830        let mut group_by_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
831        for column in columns.iter() {
832            group_by_expr.push((col(column, input_schema).unwrap(), column.to_string()));
833        }
834        PhysicalGroupBy::new_single(group_by_expr.clone())
835    }
836
837    async fn row_number_inexact_statistics_for_global_limit(
838        skip: usize,
839        fetch: Option<usize>,
840    ) -> Result<Precision<usize>> {
841        let num_partitions = 4;
842        let csv = test::scan_partitioned(num_partitions);
843
844        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
845
846        // Adding a "GROUP BY i" changes the input stats from Exact to Inexact.
847        let agg = AggregateExec::try_new(
848            AggregateMode::Final,
849            build_group_by(&csv.schema(), vec!["i".to_string()]),
850            vec![],
851            vec![],
852            Arc::clone(&csv),
853            Arc::clone(&csv.schema()),
854        )?;
855        let agg_exec: Arc<dyn ExecutionPlan> = Arc::new(agg);
856
857        let offset = GlobalLimitExec::new(
858            Arc::new(CoalescePartitionsExec::new(agg_exec)),
859            skip,
860            fetch,
861        );
862
863        Ok(offset.partition_statistics(None)?.num_rows)
864    }
865
866    async fn row_number_statistics_for_local_limit(
867        num_partitions: usize,
868        fetch: usize,
869    ) -> Result<Precision<usize>> {
870        let csv = test::scan_partitioned(num_partitions);
871
872        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
873
874        let offset = LocalLimitExec::new(csv, fetch);
875
876        Ok(offset.partition_statistics(None)?.num_rows)
877    }
878
879    /// Return a RecordBatch with a single array with row_count sz
880    fn make_batch_no_column(sz: usize) -> RecordBatch {
881        let schema = Arc::new(Schema::empty());
882
883        let options = RecordBatchOptions::new().with_row_count(Option::from(sz));
884        RecordBatch::try_new_with_options(schema, vec![], &options).unwrap()
885    }
886}