1use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26 DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27 SendableRecordBatchStream, Statistics,
28};
29use crate::execution_plan::{Boundedness, CardinalityEffect};
30use crate::{
31 DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
32 check_if_same_properties,
33};
34
35use arrow::datatypes::SchemaRef;
36use arrow::record_batch::RecordBatch;
37use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
38use datafusion_execution::TaskContext;
39
40use datafusion_physical_expr::LexOrdering;
41use futures::stream::{Stream, StreamExt};
42use log::trace;
43
44#[derive(Debug, Clone)]
46pub struct GlobalLimitExec {
47 input: Arc<dyn ExecutionPlan>,
49 skip: usize,
51 fetch: Option<usize>,
54 metrics: ExecutionPlanMetricsSet,
56 required_ordering: Option<LexOrdering>,
59 cache: Arc<PlanProperties>,
60}
61
62impl GlobalLimitExec {
63 pub fn new(input: Arc<dyn ExecutionPlan>, skip: usize, fetch: Option<usize>) -> Self {
65 let cache = Self::compute_properties(&input);
66 GlobalLimitExec {
67 input,
68 skip,
69 fetch,
70 metrics: ExecutionPlanMetricsSet::new(),
71 required_ordering: None,
72 cache: Arc::new(cache),
73 }
74 }
75
76 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
78 &self.input
79 }
80
81 pub fn skip(&self) -> usize {
83 self.skip
84 }
85
86 pub fn fetch(&self) -> Option<usize> {
88 self.fetch
89 }
90
91 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
93 PlanProperties::new(
94 input.equivalence_properties().clone(), Partitioning::UnknownPartitioning(1), input.pipeline_behavior(),
97 Boundedness::Bounded,
99 )
100 }
101
102 pub fn required_ordering(&self) -> &Option<LexOrdering> {
104 &self.required_ordering
105 }
106
107 pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
109 self.required_ordering = required_ordering;
110 }
111
112 fn with_new_children_and_same_properties(
113 &self,
114 mut children: Vec<Arc<dyn ExecutionPlan>>,
115 ) -> Self {
116 Self {
117 input: children.swap_remove(0),
118 metrics: ExecutionPlanMetricsSet::new(),
119 ..Self::clone(self)
120 }
121 }
122}
123
124impl DisplayAs for GlobalLimitExec {
125 fn fmt_as(
126 &self,
127 t: DisplayFormatType,
128 f: &mut std::fmt::Formatter,
129 ) -> std::fmt::Result {
130 match t {
131 DisplayFormatType::Default | DisplayFormatType::Verbose => {
132 write!(
133 f,
134 "GlobalLimitExec: skip={}, fetch={}",
135 self.skip,
136 self.fetch
137 .map_or_else(|| "None".to_string(), |x| x.to_string())
138 )
139 }
140 DisplayFormatType::TreeRender => {
141 if let Some(fetch) = self.fetch {
142 writeln!(f, "limit={fetch}")?;
143 }
144 write!(f, "skip={}", self.skip)
145 }
146 }
147 }
148}
149
150impl ExecutionPlan for GlobalLimitExec {
151 fn name(&self) -> &'static str {
152 "GlobalLimitExec"
153 }
154
155 fn properties(&self) -> &Arc<PlanProperties> {
157 &self.cache
158 }
159
160 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
161 vec![&self.input]
162 }
163
164 fn required_input_distribution(&self) -> Vec<Distribution> {
165 vec![Distribution::SinglePartition]
166 }
167
168 fn maintains_input_order(&self) -> Vec<bool> {
169 vec![true]
170 }
171
172 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
173 vec![false]
174 }
175
176 fn with_new_children(
177 self: Arc<Self>,
178 mut children: Vec<Arc<dyn ExecutionPlan>>,
179 ) -> Result<Arc<dyn ExecutionPlan>> {
180 check_if_same_properties!(self, children);
181 Ok(Arc::new(GlobalLimitExec::new(
182 children.swap_remove(0),
183 self.skip,
184 self.fetch,
185 )))
186 }
187
188 fn execute(
189 &self,
190 partition: usize,
191 context: Arc<TaskContext>,
192 ) -> Result<SendableRecordBatchStream> {
193 trace!("Start GlobalLimitExec::execute for partition: {partition}");
194 assert_eq_or_internal_err!(
196 partition,
197 0,
198 "GlobalLimitExec invalid partition {partition}"
199 );
200
201 assert_eq_or_internal_err!(
203 self.input.output_partitioning().partition_count(),
204 1,
205 "GlobalLimitExec requires a single input partition"
206 );
207
208 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
209 let stream = self.input.execute(0, context)?;
210 Ok(Box::pin(LimitStream::new(
211 stream,
212 self.skip,
213 self.fetch,
214 baseline_metrics,
215 )))
216 }
217
218 fn metrics(&self) -> Option<MetricsSet> {
219 Some(self.metrics.clone_inner())
220 }
221
222 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
223 let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
224 Ok(Arc::new(stats.with_fetch(self.fetch, self.skip, 1)?))
225 }
226
227 fn fetch(&self) -> Option<usize> {
228 self.fetch
229 }
230
231 fn supports_limit_pushdown(&self) -> bool {
232 true
233 }
234}
235
236#[derive(Debug, Clone)]
238pub struct LocalLimitExec {
239 input: Arc<dyn ExecutionPlan>,
241 fetch: usize,
243 metrics: ExecutionPlanMetricsSet,
245 required_ordering: Option<LexOrdering>,
248 cache: Arc<PlanProperties>,
249}
250
251impl LocalLimitExec {
252 pub fn new(input: Arc<dyn ExecutionPlan>, fetch: usize) -> Self {
254 let cache = Self::compute_properties(&input);
255 Self {
256 input,
257 fetch,
258 metrics: ExecutionPlanMetricsSet::new(),
259 required_ordering: None,
260 cache: Arc::new(cache),
261 }
262 }
263
264 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
266 &self.input
267 }
268
269 pub fn fetch(&self) -> usize {
271 self.fetch
272 }
273
274 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
276 PlanProperties::new(
277 input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
280 Boundedness::Bounded,
282 )
283 }
284
285 pub fn required_ordering(&self) -> &Option<LexOrdering> {
287 &self.required_ordering
288 }
289
290 pub fn set_required_ordering(&mut self, required_ordering: Option<LexOrdering>) {
292 self.required_ordering = required_ordering;
293 }
294
295 fn with_new_children_and_same_properties(
296 &self,
297 mut children: Vec<Arc<dyn ExecutionPlan>>,
298 ) -> Self {
299 Self {
300 input: children.swap_remove(0),
301 metrics: ExecutionPlanMetricsSet::new(),
302 ..Self::clone(self)
303 }
304 }
305}
306
307impl DisplayAs for LocalLimitExec {
308 fn fmt_as(
309 &self,
310 t: DisplayFormatType,
311 f: &mut std::fmt::Formatter,
312 ) -> std::fmt::Result {
313 match t {
314 DisplayFormatType::Default | DisplayFormatType::Verbose => {
315 write!(f, "LocalLimitExec: fetch={}", self.fetch)
316 }
317 DisplayFormatType::TreeRender => {
318 write!(f, "limit={}", self.fetch)
319 }
320 }
321 }
322}
323
324impl ExecutionPlan for LocalLimitExec {
325 fn name(&self) -> &'static str {
326 "LocalLimitExec"
327 }
328
329 fn properties(&self) -> &Arc<PlanProperties> {
331 &self.cache
332 }
333
334 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
335 vec![&self.input]
336 }
337
338 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
339 vec![false]
340 }
341
342 fn maintains_input_order(&self) -> Vec<bool> {
343 vec![true]
344 }
345
346 fn with_new_children(
347 self: Arc<Self>,
348 children: Vec<Arc<dyn ExecutionPlan>>,
349 ) -> Result<Arc<dyn ExecutionPlan>> {
350 check_if_same_properties!(self, children);
351 match children.len() {
352 1 => Ok(Arc::new(LocalLimitExec::new(
353 Arc::clone(&children[0]),
354 self.fetch,
355 ))),
356 _ => internal_err!("LocalLimitExec wrong number of children"),
357 }
358 }
359
360 fn execute(
361 &self,
362 partition: usize,
363 context: Arc<TaskContext>,
364 ) -> Result<SendableRecordBatchStream> {
365 trace!(
366 "Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}",
367 partition,
368 context.session_id(),
369 context.task_id()
370 );
371 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
372 let stream = self.input.execute(partition, context)?;
373 Ok(Box::pin(LimitStream::new(
374 stream,
375 0,
376 Some(self.fetch),
377 baseline_metrics,
378 )))
379 }
380
381 fn metrics(&self) -> Option<MetricsSet> {
382 Some(self.metrics.clone_inner())
383 }
384
385 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
386 let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
387 Ok(Arc::new(stats.with_fetch(Some(self.fetch), 0, 1)?))
388 }
389
390 fn fetch(&self) -> Option<usize> {
391 Some(self.fetch)
392 }
393
394 fn supports_limit_pushdown(&self) -> bool {
395 true
396 }
397
398 fn cardinality_effect(&self) -> CardinalityEffect {
399 CardinalityEffect::LowerEqual
400 }
401}
402
403pub struct LimitStream {
405 skip: usize,
407 fetch: usize,
409 input: Option<SendableRecordBatchStream>,
412 schema: SchemaRef,
414 baseline_metrics: BaselineMetrics,
416}
417
418impl LimitStream {
419 pub fn new(
420 input: SendableRecordBatchStream,
421 skip: usize,
422 fetch: Option<usize>,
423 baseline_metrics: BaselineMetrics,
424 ) -> Self {
425 let schema = input.schema();
426 Self {
427 skip,
428 fetch: fetch.unwrap_or(usize::MAX),
429 input: Some(input),
430 schema,
431 baseline_metrics,
432 }
433 }
434
435 fn poll_and_skip(
436 &mut self,
437 cx: &mut Context<'_>,
438 ) -> Poll<Option<Result<RecordBatch>>> {
439 let input = self.input.as_mut().unwrap();
440 loop {
441 let poll = input.poll_next_unpin(cx);
442 let poll = poll.map_ok(|batch| {
443 if batch.num_rows() <= self.skip {
444 self.skip -= batch.num_rows();
445 RecordBatch::new_empty(input.schema())
446 } else {
447 let new_batch = batch.slice(self.skip, batch.num_rows() - self.skip);
448 self.skip = 0;
449 new_batch
450 }
451 });
452
453 match &poll {
454 Poll::Ready(Some(Ok(batch))) => {
455 if batch.num_rows() > 0 {
456 break poll;
457 } else {
458 }
460 }
461 Poll::Ready(Some(Err(_e))) => break poll,
462 Poll::Ready(None) => break poll,
463 Poll::Pending => break poll,
464 }
465 }
466 }
467
468 fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
470 let _timer = self.baseline_metrics.elapsed_compute().timer();
472 if self.fetch == 0 {
473 self.input = None; None
475 } else if batch.num_rows() < self.fetch {
476 self.fetch -= batch.num_rows();
478 Some(batch)
479 } else if batch.num_rows() >= self.fetch {
480 let batch_rows = self.fetch;
481 self.fetch = 0;
482 self.input = None; Some(batch.slice(0, batch_rows))
486 } else {
487 unreachable!()
488 }
489 }
490}
491
492impl Stream for LimitStream {
493 type Item = Result<RecordBatch>;
494
495 fn poll_next(
496 mut self: Pin<&mut Self>,
497 cx: &mut Context<'_>,
498 ) -> Poll<Option<Self::Item>> {
499 let fetch_started = self.skip == 0;
500 let poll = match &mut self.input {
501 Some(input) => {
502 let poll = if fetch_started {
503 input.poll_next_unpin(cx)
504 } else {
505 self.poll_and_skip(cx)
506 };
507
508 poll.map(|x| match x {
509 Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
510 other => other,
511 })
512 }
513 None => Poll::Ready(None),
515 };
516
517 self.baseline_metrics.record_poll(poll)
518 }
519}
520
521impl RecordBatchStream for LimitStream {
522 fn schema(&self) -> SchemaRef {
524 Arc::clone(&self.schema)
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use crate::coalesce_partitions::CoalescePartitionsExec;
532 use crate::common::collect;
533 use crate::test;
534
535 use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
536 use arrow::array::RecordBatchOptions;
537 use arrow::datatypes::Schema;
538 use datafusion_common::stats::Precision;
539 use datafusion_physical_expr::PhysicalExpr;
540 use datafusion_physical_expr::expressions::col;
541
542 #[tokio::test]
543 async fn limit() -> Result<()> {
544 let task_ctx = Arc::new(TaskContext::default());
545
546 let num_partitions = 4;
547 let csv = test::scan_partitioned(num_partitions);
548
549 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
551
552 let limit =
553 GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 0, Some(7));
554
555 let iter = limit.execute(0, task_ctx)?;
557 let batches = collect(iter).await?;
558
559 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
561 assert_eq!(row_count, 7);
562
563 Ok(())
564 }
565
566 #[tokio::test]
567 async fn limit_early_shutdown() -> Result<()> {
568 let batches = vec![
569 test::make_partition(5),
570 test::make_partition(10),
571 test::make_partition(15),
572 test::make_partition(20),
573 test::make_partition(25),
574 ];
575 let input = test::exec::TestStream::new(batches);
576
577 let index = input.index();
578 assert_eq!(index.value(), 0);
579
580 let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
583 let limit_stream =
584 LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
585 assert_eq!(index.value(), 0);
586
587 let results = collect(Box::pin(limit_stream)).await.unwrap();
588 let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
589 assert_eq!(num_rows, 6);
591
592 assert_eq!(index.value(), 2);
594
595 Ok(())
596 }
597
598 #[tokio::test]
599 async fn limit_equals_batch_size() -> Result<()> {
600 let batches = vec![
601 test::make_partition(6),
602 test::make_partition(6),
603 test::make_partition(6),
604 ];
605 let input = test::exec::TestStream::new(batches);
606
607 let index = input.index();
608 assert_eq!(index.value(), 0);
609
610 let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
613 let limit_stream =
614 LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
615 assert_eq!(index.value(), 0);
616
617 let results = collect(Box::pin(limit_stream)).await.unwrap();
618 let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
619 assert_eq!(num_rows, 6);
621
622 assert_eq!(index.value(), 1);
624
625 Ok(())
626 }
627
628 #[tokio::test]
629 async fn limit_no_column() -> Result<()> {
630 let batches = vec![
631 make_batch_no_column(6),
632 make_batch_no_column(6),
633 make_batch_no_column(6),
634 ];
635 let input = test::exec::TestStream::new(batches);
636
637 let index = input.index();
638 assert_eq!(index.value(), 0);
639
640 let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
643 let limit_stream =
644 LimitStream::new(Box::pin(input), 0, Some(6), baseline_metrics);
645 assert_eq!(index.value(), 0);
646
647 let results = collect(Box::pin(limit_stream)).await.unwrap();
648 let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
649 assert_eq!(num_rows, 6);
651
652 assert_eq!(index.value(), 1);
654
655 Ok(())
656 }
657
658 async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> Result<usize> {
660 let task_ctx = Arc::new(TaskContext::default());
661
662 let num_partitions = 4;
664 let csv = test::scan_partitioned(num_partitions);
665
666 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
667
668 let offset =
669 GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);
670
671 let iter = offset.execute(0, task_ctx)?;
673 let batches = collect(iter).await?;
674 Ok(batches.iter().map(|batch| batch.num_rows()).sum())
675 }
676
677 #[tokio::test]
678 async fn skip_none_fetch_none() -> Result<()> {
679 let row_count = skip_and_fetch(0, None).await?;
680 assert_eq!(row_count, 400);
681 Ok(())
682 }
683
684 #[tokio::test]
685 async fn skip_none_fetch_50() -> Result<()> {
686 let row_count = skip_and_fetch(0, Some(50)).await?;
687 assert_eq!(row_count, 50);
688 Ok(())
689 }
690
691 #[tokio::test]
692 async fn skip_3_fetch_none() -> Result<()> {
693 let row_count = skip_and_fetch(3, None).await?;
695 assert_eq!(row_count, 397);
696 Ok(())
697 }
698
699 #[tokio::test]
700 async fn skip_3_fetch_10_stats() -> Result<()> {
701 let row_count = skip_and_fetch(3, Some(10)).await?;
703 assert_eq!(row_count, 10);
704 Ok(())
705 }
706
707 #[tokio::test]
708 async fn skip_400_fetch_none() -> Result<()> {
709 let row_count = skip_and_fetch(400, None).await?;
710 assert_eq!(row_count, 0);
711 Ok(())
712 }
713
714 #[tokio::test]
715 async fn skip_400_fetch_1() -> Result<()> {
716 let row_count = skip_and_fetch(400, Some(1)).await?;
718 assert_eq!(row_count, 0);
719 Ok(())
720 }
721
722 #[tokio::test]
723 async fn skip_401_fetch_none() -> Result<()> {
724 let row_count = skip_and_fetch(401, None).await?;
726 assert_eq!(row_count, 0);
727 Ok(())
728 }
729
730 #[tokio::test]
731 async fn test_row_number_statistics_for_global_limit() -> Result<()> {
732 let row_count = row_number_statistics_for_global_limit(0, Some(10)).await?;
733 assert_eq!(row_count, Precision::Exact(10));
734
735 let row_count = row_number_statistics_for_global_limit(5, Some(10)).await?;
736 assert_eq!(row_count, Precision::Exact(10));
737
738 let row_count = row_number_statistics_for_global_limit(400, Some(10)).await?;
739 assert_eq!(row_count, Precision::Exact(0));
740
741 let row_count = row_number_statistics_for_global_limit(398, Some(10)).await?;
742 assert_eq!(row_count, Precision::Exact(2));
743
744 let row_count = row_number_statistics_for_global_limit(398, Some(1)).await?;
745 assert_eq!(row_count, Precision::Exact(1));
746
747 let row_count = row_number_statistics_for_global_limit(398, None).await?;
748 assert_eq!(row_count, Precision::Exact(2));
749
750 let row_count =
751 row_number_statistics_for_global_limit(0, Some(usize::MAX)).await?;
752 assert_eq!(row_count, Precision::Exact(400));
753
754 let row_count =
755 row_number_statistics_for_global_limit(398, Some(usize::MAX)).await?;
756 assert_eq!(row_count, Precision::Exact(2));
757
758 let row_count =
759 row_number_inexact_statistics_for_global_limit(0, Some(10)).await?;
760 assert_eq!(row_count, Precision::Inexact(10));
761
762 let row_count =
763 row_number_inexact_statistics_for_global_limit(5, Some(10)).await?;
764 assert_eq!(row_count, Precision::Inexact(10));
765
766 let row_count =
770 row_number_inexact_statistics_for_global_limit(400, Some(10)).await?;
771 assert_eq!(row_count, Precision::Inexact(0));
772
773 let row_count =
774 row_number_inexact_statistics_for_global_limit(398, Some(10)).await?;
775 assert_eq!(row_count, Precision::Inexact(2));
776
777 let row_count =
778 row_number_inexact_statistics_for_global_limit(398, Some(1)).await?;
779 assert_eq!(row_count, Precision::Inexact(1));
780
781 let row_count = row_number_inexact_statistics_for_global_limit(398, None).await?;
782 assert_eq!(row_count, Precision::Inexact(2));
783
784 let row_count =
785 row_number_inexact_statistics_for_global_limit(0, Some(usize::MAX)).await?;
786 assert_eq!(row_count, Precision::Inexact(400));
787
788 let row_count =
789 row_number_inexact_statistics_for_global_limit(398, Some(usize::MAX)).await?;
790 assert_eq!(row_count, Precision::Inexact(2));
791
792 Ok(())
793 }
794
795 #[tokio::test]
796 async fn test_row_number_statistics_for_local_limit() -> Result<()> {
797 let row_count = row_number_statistics_for_local_limit(4, 10).await?;
798 assert_eq!(row_count, Precision::Exact(10));
799
800 Ok(())
801 }
802
803 async fn row_number_statistics_for_global_limit(
804 skip: usize,
805 fetch: Option<usize>,
806 ) -> Result<Precision<usize>> {
807 let num_partitions = 4;
808 let csv = test::scan_partitioned(num_partitions);
809
810 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
811
812 let offset =
813 GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);
814
815 Ok(offset.partition_statistics(None)?.num_rows)
816 }
817
818 pub fn build_group_by(
819 input_schema: &SchemaRef,
820 columns: Vec<String>,
821 ) -> PhysicalGroupBy {
822 let mut group_by_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
823 for column in columns.iter() {
824 group_by_expr.push((col(column, input_schema).unwrap(), column.to_string()));
825 }
826 PhysicalGroupBy::new_single(group_by_expr.clone())
827 }
828
829 async fn row_number_inexact_statistics_for_global_limit(
830 skip: usize,
831 fetch: Option<usize>,
832 ) -> Result<Precision<usize>> {
833 let num_partitions = 4;
834 let csv = test::scan_partitioned(num_partitions);
835
836 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
837
838 let agg = AggregateExec::try_new(
840 AggregateMode::Final,
841 build_group_by(&csv.schema(), vec!["i".to_string()]),
842 vec![],
843 vec![],
844 Arc::clone(&csv),
845 Arc::clone(&csv.schema()),
846 )?;
847 let agg_exec: Arc<dyn ExecutionPlan> = Arc::new(agg);
848
849 let offset = GlobalLimitExec::new(
850 Arc::new(CoalescePartitionsExec::new(agg_exec)),
851 skip,
852 fetch,
853 );
854
855 Ok(offset.partition_statistics(None)?.num_rows)
856 }
857
858 async fn row_number_statistics_for_local_limit(
859 num_partitions: usize,
860 fetch: usize,
861 ) -> Result<Precision<usize>> {
862 let csv = test::scan_partitioned(num_partitions);
863
864 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
865
866 let offset = LocalLimitExec::new(csv, fetch);
867
868 Ok(offset.partition_statistics(None)?.num_rows)
869 }
870
871 fn make_batch_no_column(sz: usize) -> RecordBatch {
873 let schema = Arc::new(Schema::empty());
874
875 let options = RecordBatchOptions::new().with_row_count(Option::from(sz));
876 RecordBatch::try_new_with_options(schema, vec![], &options).unwrap()
877 }
878}