1use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{
27 DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
28 SendableRecordBatchStream, Statistics,
29};
30use crate::execution_plan::{Boundedness, CardinalityEffect};
31use crate::{
32 DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
33 check_if_same_properties,
34};
35
36use arrow::datatypes::SchemaRef;
37use arrow::record_batch::RecordBatch;
38use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
39use datafusion_execution::TaskContext;
40
41use datafusion_physical_expr::LexOrdering;
42use futures::stream::{Stream, StreamExt};
43use log::trace;
44
45#[derive(Debug, Clone)]
47pub struct GlobalLimitExec {
48 input: Arc<dyn ExecutionPlan>,
50 skip: usize,
52 fetch: Option<usize>,
55 metrics: ExecutionPlanMetricsSet,
57 required_ordering: Option<LexOrdering>,
60 cache: Arc<PlanProperties>,
61}
62
63impl GlobalLimitExec {
64 pub fn new(input: Arc<dyn ExecutionPlan>, skip: usize, fetch: Option<usize>) -> Self {
66 let cache = Self::compute_properties(&input);
67 GlobalLimitExec {
68 input,
69 skip,
70 fetch,
71 metrics: ExecutionPlanMetricsSet::new(),
72 required_ordering: None,
73 cache: Arc::new(cache),
74 }
75 }
76
77 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
79 &self.input
80 }
81
82 pub fn skip(&self) -> usize {
84 self.skip
85 }
86
87 pub fn fetch(&self) -> Option<usize> {
89 self.fetch
90 }
91
92 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
94 PlanProperties::new(
95 input.equivalence_properties().clone(), Partitioning::UnknownPartitioning(1), input.pipeline_behavior(),
98 Boundedness::Bounded,
100 )
101 }
102
103 pub fn required_ordering(&self) -> &Option<LexOrdering> {
105 &self.required_ordering
106 }
107
108 pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
110 self.required_ordering = required_ordering;
111 }
112
113 fn with_new_children_and_same_properties(
114 &self,
115 mut children: Vec<Arc<dyn ExecutionPlan>>,
116 ) -> Self {
117 Self {
118 input: children.swap_remove(0),
119 metrics: ExecutionPlanMetricsSet::new(),
120 ..Self::clone(self)
121 }
122 }
123}
124
125impl DisplayAs for GlobalLimitExec {
126 fn fmt_as(
127 &self,
128 t: DisplayFormatType,
129 f: &mut std::fmt::Formatter,
130 ) -> std::fmt::Result {
131 match t {
132 DisplayFormatType::Default | DisplayFormatType::Verbose => {
133 write!(
134 f,
135 "GlobalLimitExec: skip={}, fetch={}",
136 self.skip,
137 self.fetch
138 .map_or_else(|| "None".to_string(), |x| x.to_string())
139 )
140 }
141 DisplayFormatType::TreeRender => {
142 if let Some(fetch) = self.fetch {
143 writeln!(f, "limit={fetch}")?;
144 }
145 write!(f, "skip={}", self.skip)
146 }
147 }
148 }
149}
150
151impl ExecutionPlan for GlobalLimitExec {
152 fn name(&self) -> &'static str {
153 "GlobalLimitExec"
154 }
155
156 fn as_any(&self) -> &dyn Any {
158 self
159 }
160
161 fn properties(&self) -> &Arc<PlanProperties> {
162 &self.cache
163 }
164
165 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
166 vec![&self.input]
167 }
168
169 fn required_input_distribution(&self) -> Vec<Distribution> {
170 vec![Distribution::SinglePartition]
171 }
172
173 fn maintains_input_order(&self) -> Vec<bool> {
174 vec![true]
175 }
176
177 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
178 vec![false]
179 }
180
181 fn with_new_children(
182 self: Arc<Self>,
183 mut children: Vec<Arc<dyn ExecutionPlan>>,
184 ) -> Result<Arc<dyn ExecutionPlan>> {
185 check_if_same_properties!(self, children);
186 Ok(Arc::new(GlobalLimitExec::new(
187 children.swap_remove(0),
188 self.skip,
189 self.fetch,
190 )))
191 }
192
193 fn execute(
194 &self,
195 partition: usize,
196 context: Arc<TaskContext>,
197 ) -> Result<SendableRecordBatchStream> {
198 trace!("Start GlobalLimitExec::execute for partition: {partition}");
199 assert_eq_or_internal_err!(
201 partition,
202 0,
203 "GlobalLimitExec invalid partition {partition}"
204 );
205
206 assert_eq_or_internal_err!(
208 self.input.output_partitioning().partition_count(),
209 1,
210 "GlobalLimitExec requires a single input partition"
211 );
212
213 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
214 let stream = self.input.execute(0, context)?;
215 Ok(Box::pin(LimitStream::new(
216 stream,
217 self.skip,
218 self.fetch,
219 baseline_metrics,
220 )))
221 }
222
223 fn metrics(&self) -> Option<MetricsSet> {
224 Some(self.metrics.clone_inner())
225 }
226
227 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
228 self.input
229 .partition_statistics(partition)?
230 .with_fetch(self.fetch, self.skip, 1)
231 }
232
233 fn fetch(&self) -> Option<usize> {
234 self.fetch
235 }
236
237 fn supports_limit_pushdown(&self) -> bool {
238 true
239 }
240}
241
242#[derive(Debug, Clone)]
244pub struct LocalLimitExec {
245 input: Arc<dyn ExecutionPlan>,
247 fetch: usize,
249 metrics: ExecutionPlanMetricsSet,
251 required_ordering: Option<LexOrdering>,
254 cache: Arc<PlanProperties>,
255}
256
257impl LocalLimitExec {
258 pub fn new(input: Arc<dyn ExecutionPlan>, fetch: usize) -> Self {
260 let cache = Self::compute_properties(&input);
261 Self {
262 input,
263 fetch,
264 metrics: ExecutionPlanMetricsSet::new(),
265 required_ordering: None,
266 cache: Arc::new(cache),
267 }
268 }
269
270 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
272 &self.input
273 }
274
275 pub fn fetch(&self) -> usize {
277 self.fetch
278 }
279
280 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
282 PlanProperties::new(
283 input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
286 Boundedness::Bounded,
288 )
289 }
290
291 pub fn required_ordering(&self) -> &Option<LexOrdering> {
293 &self.required_ordering
294 }
295
296 pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
298 self.required_ordering = required_ordering;
299 }
300
301 fn with_new_children_and_same_properties(
302 &self,
303 mut children: Vec<Arc<dyn ExecutionPlan>>,
304 ) -> Self {
305 Self {
306 input: children.swap_remove(0),
307 metrics: ExecutionPlanMetricsSet::new(),
308 ..Self::clone(self)
309 }
310 }
311}
312
313impl DisplayAs for LocalLimitExec {
314 fn fmt_as(
315 &self,
316 t: DisplayFormatType,
317 f: &mut std::fmt::Formatter,
318 ) -> std::fmt::Result {
319 match t {
320 DisplayFormatType::Default | DisplayFormatType::Verbose => {
321 write!(f, "LocalLimitExec: fetch={}", self.fetch)
322 }
323 DisplayFormatType::TreeRender => {
324 write!(f, "limit={}", self.fetch)
325 }
326 }
327 }
328}
329
330impl ExecutionPlan for LocalLimitExec {
331 fn name(&self) -> &'static str {
332 "LocalLimitExec"
333 }
334
335 fn as_any(&self) -> &dyn Any {
337 self
338 }
339
340 fn properties(&self) -> &Arc<PlanProperties> {
341 &self.cache
342 }
343
344 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
345 vec![&self.input]
346 }
347
348 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
349 vec![false]
350 }
351
352 fn maintains_input_order(&self) -> Vec<bool> {
353 vec![true]
354 }
355
356 fn with_new_children(
357 self: Arc<Self>,
358 children: Vec<Arc<dyn ExecutionPlan>>,
359 ) -> Result<Arc<dyn ExecutionPlan>> {
360 check_if_same_properties!(self, children);
361 match children.len() {
362 1 => Ok(Arc::new(LocalLimitExec::new(
363 Arc::clone(&children[0]),
364 self.fetch,
365 ))),
366 _ => internal_err!("LocalLimitExec wrong number of children"),
367 }
368 }
369
370 fn execute(
371 &self,
372 partition: usize,
373 context: Arc<TaskContext>,
374 ) -> Result<SendableRecordBatchStream> {
375 trace!(
376 "Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}",
377 partition,
378 context.session_id(),
379 context.task_id()
380 );
381 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
382 let stream = self.input.execute(partition, context)?;
383 Ok(Box::pin(LimitStream::new(
384 stream,
385 0,
386 Some(self.fetch),
387 baseline_metrics,
388 )))
389 }
390
391 fn metrics(&self) -> Option<MetricsSet> {
392 Some(self.metrics.clone_inner())
393 }
394
395 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
396 self.input
397 .partition_statistics(partition)?
398 .with_fetch(Some(self.fetch), 0, 1)
399 }
400
401 fn fetch(&self) -> Option<usize> {
402 Some(self.fetch)
403 }
404
405 fn supports_limit_pushdown(&self) -> bool {
406 true
407 }
408
409 fn cardinality_effect(&self) -> CardinalityEffect {
410 CardinalityEffect::LowerEqual
411 }
412}
413
414pub struct LimitStream {
416 skip: usize,
418 fetch: usize,
420 input: Option<SendableRecordBatchStream>,
423 schema: SchemaRef,
425 baseline_metrics: BaselineMetrics,
427}
428
429impl LimitStream {
430 pub fn new(
431 input: SendableRecordBatchStream,
432 skip: usize,
433 fetch: Option<usize>,
434 baseline_metrics: BaselineMetrics,
435 ) -> Self {
436 let schema = input.schema();
437 Self {
438 skip,
439 fetch: fetch.unwrap_or(usize::MAX),
440 input: Some(input),
441 schema,
442 baseline_metrics,
443 }
444 }
445
446 fn poll_and_skip(
447 &mut self,
448 cx: &mut Context<'_>,
449 ) -> Poll<Option<Result<RecordBatch>>> {
450 let input = self.input.as_mut().unwrap();
451 loop {
452 let poll = input.poll_next_unpin(cx);
453 let poll = poll.map_ok(|batch| {
454 if batch.num_rows() <= self.skip {
455 self.skip -= batch.num_rows();
456 RecordBatch::new_empty(input.schema())
457 } else {
458 let new_batch = batch.slice(self.skip, batch.num_rows() - self.skip);
459 self.skip = 0;
460 new_batch
461 }
462 });
463
464 match &poll {
465 Poll::Ready(Some(Ok(batch))) => {
466 if batch.num_rows() > 0 {
467 break poll;
468 } else {
469 }
471 }
472 Poll::Ready(Some(Err(_e))) => break poll,
473 Poll::Ready(None) => break poll,
474 Poll::Pending => break poll,
475 }
476 }
477 }
478
479 fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
481 let _timer = self.baseline_metrics.elapsed_compute().timer();
483 if self.fetch == 0 {
484 self.input = None; None
486 } else if batch.num_rows() < self.fetch {
487 self.fetch -= batch.num_rows();
489 Some(batch)
490 } else if batch.num_rows() >= self.fetch {
491 let batch_rows = self.fetch;
492 self.fetch = 0;
493 self.input = None; Some(batch.slice(0, batch_rows))
497 } else {
498 unreachable!()
499 }
500 }
501}
502
503impl Stream for LimitStream {
504 type Item = Result<RecordBatch>;
505
506 fn poll_next(
507 mut self: Pin<&mut Self>,
508 cx: &mut Context<'_>,
509 ) -> Poll<Option<Self::Item>> {
510 let fetch_started = self.skip == 0;
511 let poll = match &mut self.input {
512 Some(input) => {
513 let poll = if fetch_started {
514 input.poll_next_unpin(cx)
515 } else {
516 self.poll_and_skip(cx)
517 };
518
519 poll.map(|x| match x {
520 Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
521 other => other,
522 })
523 }
524 None => Poll::Ready(None),
526 };
527
528 self.baseline_metrics.record_poll(poll)
529 }
530}
531
532impl RecordBatchStream for LimitStream {
533 fn schema(&self) -> SchemaRef {
535 Arc::clone(&self.schema)
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542 use crate::coalesce_partitions::CoalescePartitionsExec;
543 use crate::common::collect;
544 use crate::test;
545
546 use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
547 use arrow::array::RecordBatchOptions;
548 use arrow::datatypes::Schema;
549 use datafusion_common::stats::Precision;
550 use datafusion_physical_expr::PhysicalExpr;
551 use datafusion_physical_expr::expressions::col;
552
553 #[tokio::test]
554 async fn limit() -> Result<()> {
555 let task_ctx = Arc::new(TaskContext::default());
556
557 let num_partitions = 4;
558 let csv = test::scan_partitioned(num_partitions);
559
560 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
562
563 let limit =
564 GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 0, Some(7));
565
566 let iter = limit.execute(0, task_ctx)?;
568 let batches = collect(iter).await?;
569
570 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
572 assert_eq!(row_count, 7);
573
574 Ok(())
575 }
576
577 #[tokio::test]
578 async fn limit_early_shutdown() -> Result<()> {
579 let batches = vec![
580 test::make_partition(5),
581 test::make_partition(10),
582 test::make_partition(15),
583 test::make_partition(20),
584 test::make_partition(25),
585 ];
586 let input = test::exec::TestStream::new(batches);
587
588 let index = input.index();
589 assert_eq!(index.value(), 0);
590
591 let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
594 let limit_stream =
595 LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
596 assert_eq!(index.value(), 0);
597
598 let results = collect(Box::pin(limit_stream)).await.unwrap();
599 let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
600 assert_eq!(num_rows, 6);
602
603 assert_eq!(index.value(), 2);
605
606 Ok(())
607 }
608
609 #[tokio::test]
610 async fn limit_equals_batch_size() -> Result<()> {
611 let batches = vec![
612 test::make_partition(6),
613 test::make_partition(6),
614 test::make_partition(6),
615 ];
616 let input = test::exec::TestStream::new(batches);
617
618 let index = input.index();
619 assert_eq!(index.value(), 0);
620
621 let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
624 let limit_stream =
625 LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
626 assert_eq!(index.value(), 0);
627
628 let results = collect(Box::pin(limit_stream)).await.unwrap();
629 let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
630 assert_eq!(num_rows, 6);
632
633 assert_eq!(index.value(), 1);
635
636 Ok(())
637 }
638
639 #[tokio::test]
640 async fn limit_no_column() -> Result<()> {
641 let batches = vec![
642 make_batch_no_column(6),
643 make_batch_no_column(6),
644 make_batch_no_column(6),
645 ];
646 let input = test::exec::TestStream::new(batches);
647
648 let index = input.index();
649 assert_eq!(index.value(), 0);
650
651 let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
654 let limit_stream =
655 LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
656 assert_eq!(index.value(), 0);
657
658 let results = collect(Box::pin(limit_stream)).await.unwrap();
659 let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
660 assert_eq!(num_rows, 6);
662
663 assert_eq!(index.value(), 1);
665
666 Ok(())
667 }
668
669 async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
671 let task_ctx = Arc::new(TaskContext::default());
672
673 let num_partitions = 4;
675 let csv = test::scan_partitioned(num_partitions);
676
677 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
678
679 let offset =
680 GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);
681
682 let iter = offset.execute(0, task_ctx)?;
684 let batches = collect(iter).await?;
685 Ok(batches.iter().map(|batch| batch.num_rows()).sum())
686 }
687
688 #[tokio::test]
689 async fn skip_none_fetch_none() -> Result<()> {
690 let row_count = skip_and_fetch(0, None).await?;
691 assert_eq!(row_count, 400);
692 Ok(())
693 }
694
695 #[tokio::test]
696 async fn skip_none_fetch_50() -> Result<()> {
697 let row_count = skip_and_fetch(0, Some(50)).await?;
698 assert_eq!(row_count, 50);
699 Ok(())
700 }
701
702 #[tokio::test]
703 async fn skip_3_fetch_none() -> Result<()> {
704 let row_count = skip_and_fetch(3, None).await?;
706 assert_eq!(row_count, 397);
707 Ok(())
708 }
709
710 #[tokio::test]
711 async fn skip_3_fetch_10_stats() -> Result<()> {
712 let row_count = skip_and_fetch(3, Some(10)).await?;
714 assert_eq!(row_count, 10);
715 Ok(())
716 }
717
718 #[tokio::test]
719 async fn skip_400_fetch_none() -> Result<()> {
720 let row_count = skip_and_fetch(400, None).await?;
721 assert_eq!(row_count, 0);
722 Ok(())
723 }
724
725 #[tokio::test]
726 async fn skip_400_fetch_1() -> Result<()> {
727 let row_count = skip_and_fetch(400, Some(1)).await?;
729 assert_eq!(row_count, 0);
730 Ok(())
731 }
732
733 #[tokio::test]
734 async fn skip_401_fetch_none() -> Result<()> {
735 let row_count = skip_and_fetch(401, None).await?;
737 assert_eq!(row_count, 0);
738 Ok(())
739 }
740
741 #[tokio::test]
742 async fn test_row_number_statistics_for_global_limit() -> Result<()> {
743 let row_count = row_number_statistics_for_global_limit(0, Some(10)).await?;
744 assert_eq!(row_count, Precision::Exact(10));
745
746 let row_count = row_number_statistics_for_global_limit(5, Some(10)).await?;
747 assert_eq!(row_count, Precision::Exact(10));
748
749 let row_count = row_number_statistics_for_global_limit(400, Some(10)).await?;
750 assert_eq!(row_count, Precision::Exact(0));
751
752 let row_count = row_number_statistics_for_global_limit(398, Some(10)).await?;
753 assert_eq!(row_count, Precision::Exact(2));
754
755 let row_count = row_number_statistics_for_global_limit(398, Some(1)).await?;
756 assert_eq!(row_count, Precision::Exact(1));
757
758 let row_count = row_number_statistics_for_global_limit(398, None).await?;
759 assert_eq!(row_count, Precision::Exact(2));
760
761 let row_count =
762 row_number_statistics_for_global_limit(0, Some(usize::MAX)).await?;
763 assert_eq!(row_count, Precision::Exact(400));
764
765 let row_count =
766 row_number_statistics_for_global_limit(398, Some(usize::MAX)).await?;
767 assert_eq!(row_count, Precision::Exact(2));
768
769 let row_count =
770 row_number_inexact_statistics_for_global_limit(0, Some(10)).await?;
771 assert_eq!(row_count, Precision::Inexact(10));
772
773 let row_count =
774 row_number_inexact_statistics_for_global_limit(5, Some(10)).await?;
775 assert_eq!(row_count, Precision::Inexact(10));
776
777 let row_count =
778 row_number_inexact_statistics_for_global_limit(400, Some(10)).await?;
779 assert_eq!(row_count, Precision::Exact(0));
780
781 let row_count =
782 row_number_inexact_statistics_for_global_limit(398, Some(10)).await?;
783 assert_eq!(row_count, Precision::Inexact(2));
784
785 let row_count =
786 row_number_inexact_statistics_for_global_limit(398, Some(1)).await?;
787 assert_eq!(row_count, Precision::Inexact(1));
788
789 let row_count = row_number_inexact_statistics_for_global_limit(398, None).await?;
790 assert_eq!(row_count, Precision::Inexact(2));
791
792 let row_count =
793 row_number_inexact_statistics_for_global_limit(0, Some(usize::MAX)).await?;
794 assert_eq!(row_count, Precision::Inexact(400));
795
796 let row_count =
797 row_number_inexact_statistics_for_global_limit(398, Some(usize::MAX)).await?;
798 assert_eq!(row_count, Precision::Inexact(2));
799
800 Ok(())
801 }
802
803 #[tokio::test]
804 async fn test_row_number_statistics_for_local_limit() -> Result<()> {
805 let row_count = row_number_statistics_for_local_limit(4, 10).await?;
806 assert_eq!(row_count, Precision::Exact(10));
807
808 Ok(())
809 }
810
811 async fn row_number_statistics_for_global_limit(
812 skip: usize,
813 fetch: Option<usize>,
814 ) -> Result<Precision<usize>> {
815 let num_partitions = 4;
816 let csv = test::scan_partitioned(num_partitions);
817
818 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
819
820 let offset =
821 GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);
822
823 Ok(offset.partition_statistics(None)?.num_rows)
824 }
825
826 pub fn build_group_by(
827 input_schema: &SchemaRef,
828 columns: Vec<String>,
829 ) -> PhysicalGroupBy {
830 let mut group_by_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
831 for column in columns.iter() {
832 group_by_expr.push((col(column, input_schema).unwrap(), column.to_string()));
833 }
834 PhysicalGroupBy::new_single(group_by_expr.clone())
835 }
836
837 async fn row_number_inexact_statistics_for_global_limit(
838 skip: usize,
839 fetch: Option<usize>,
840 ) -> Result<Precision<usize>> {
841 let num_partitions = 4;
842 let csv = test::scan_partitioned(num_partitions);
843
844 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
845
846 let agg = AggregateExec::try_new(
848 AggregateMode::Final,
849 build_group_by(&csv.schema(), vec!["i".to_string()]),
850 vec![],
851 vec![],
852 Arc::clone(&csv),
853 Arc::clone(&csv.schema()),
854 )?;
855 let agg_exec: Arc<dyn ExecutionPlan> = Arc::new(agg);
856
857 let offset = GlobalLimitExec::new(
858 Arc::new(CoalescePartitionsExec::new(agg_exec)),
859 skip,
860 fetch,
861 );
862
863 Ok(offset.partition_statistics(None)?.num_rows)
864 }
865
866 async fn row_number_statistics_for_local_limit(
867 num_partitions: usize,
868 fetch: usize,
869 ) -> Result<Precision<usize>> {
870 let csv = test::scan_partitioned(num_partitions);
871
872 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
873
874 let offset = LocalLimitExec::new(csv, fetch);
875
876 Ok(offset.partition_statistics(None)?.num_rows)
877 }
878
879 fn make_batch_no_column(sz: usize) -> RecordBatch {
881 let schema = Arc::new(Schema::empty());
882
883 let options = RecordBatchOptions::new().with_row_count(Option::from(sz));
884 RecordBatch::try_new_with_options(schema, vec![], &options).unwrap()
885 }
886}