datafusion_physical_plan/test/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Simple iterator over batches for use in testing
19
20use std::{
21    any::Any,
22    pin::Pin,
23    sync::{Arc, Weak},
24    task::{Context, Poll},
25};
26
27use crate::{
28    common, execution_plan::Boundedness, DisplayAs, DisplayFormatType, ExecutionPlan,
29    Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
30    Statistics,
31};
32use crate::{
33    execution_plan::EmissionType,
34    stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
35};
36
37use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38use arrow::record_batch::RecordBatch;
39use datafusion_common::{internal_err, DataFusionError, Result};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::EquivalenceProperties;
42
43use futures::Stream;
44use tokio::sync::Barrier;
45
46/// Index into the data that has been returned so far
47#[derive(Debug, Default, Clone)]
48pub struct BatchIndex {
49    inner: Arc<std::sync::Mutex<usize>>,
50}
51
52impl BatchIndex {
53    /// Return the current index
54    pub fn value(&self) -> usize {
55        let inner = self.inner.lock().unwrap();
56        *inner
57    }
58
59    // increment the current index by one
60    pub fn incr(&self) {
61        let mut inner = self.inner.lock().unwrap();
62        *inner += 1;
63    }
64}
65
66/// Iterator over batches
67#[derive(Debug, Default)]
68pub struct TestStream {
69    /// Vector of record batches
70    data: Vec<RecordBatch>,
71    /// Index into the data that has been returned so far
72    index: BatchIndex,
73}
74
75impl TestStream {
76    /// Create an iterator for a vector of record batches. Assumes at
77    /// least one entry in data (for the schema)
78    pub fn new(data: Vec<RecordBatch>) -> Self {
79        Self {
80            data,
81            ..Default::default()
82        }
83    }
84
85    /// Return a handle to the index counter for this stream
86    pub fn index(&self) -> BatchIndex {
87        self.index.clone()
88    }
89}
90
91impl Stream for TestStream {
92    type Item = Result<RecordBatch>;
93
94    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95        let next_batch = self.index.value();
96
97        Poll::Ready(if next_batch < self.data.len() {
98            let next_batch = self.index.value();
99            self.index.incr();
100            Some(Ok(self.data[next_batch].clone()))
101        } else {
102            None
103        })
104    }
105
106    fn size_hint(&self) -> (usize, Option<usize>) {
107        (self.data.len(), Some(self.data.len()))
108    }
109}
110
111impl RecordBatchStream for TestStream {
112    /// Get the schema
113    fn schema(&self) -> SchemaRef {
114        self.data[0].schema()
115    }
116}
117
118/// A Mock ExecutionPlan that can be used for writing tests of other
119/// ExecutionPlans
120#[derive(Debug)]
121pub struct MockExec {
122    /// the results to send back
123    data: Vec<Result<RecordBatch>>,
124    schema: SchemaRef,
125    /// if true (the default), sends data using a separate task to ensure the
126    /// batches are not available without this stream yielding first
127    use_task: bool,
128    cache: PlanProperties,
129}
130
131impl MockExec {
132    /// Create a new `MockExec` with a single partition that returns
133    /// the specified `Results`s.
134    ///
135    /// By default, the batches are not produced immediately (the
136    /// caller has to actually yield and another task must run) to
137    /// ensure any poll loops are correct. This behavior can be
138    /// changed with `with_use_task`
139    pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
140        let cache = Self::compute_properties(Arc::clone(&schema));
141        Self {
142            data,
143            schema,
144            use_task: true,
145            cache,
146        }
147    }
148
149    /// If `use_task` is true (the default) then the batches are sent
150    /// back using a separate task to ensure the underlying stream is
151    /// not immediately ready
152    pub fn with_use_task(mut self, use_task: bool) -> Self {
153        self.use_task = use_task;
154        self
155    }
156
157    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
158    fn compute_properties(schema: SchemaRef) -> PlanProperties {
159        PlanProperties::new(
160            EquivalenceProperties::new(schema),
161            Partitioning::UnknownPartitioning(1),
162            EmissionType::Incremental,
163            Boundedness::Bounded,
164        )
165    }
166}
167
168impl DisplayAs for MockExec {
169    fn fmt_as(
170        &self,
171        t: DisplayFormatType,
172        f: &mut std::fmt::Formatter,
173    ) -> std::fmt::Result {
174        match t {
175            DisplayFormatType::Default | DisplayFormatType::Verbose => {
176                write!(f, "MockExec")
177            }
178            DisplayFormatType::TreeRender => {
179                // TODO: collect info
180                write!(f, "")
181            }
182        }
183    }
184}
185
186impl ExecutionPlan for MockExec {
187    fn name(&self) -> &'static str {
188        Self::static_name()
189    }
190
191    fn as_any(&self) -> &dyn Any {
192        self
193    }
194
195    fn properties(&self) -> &PlanProperties {
196        &self.cache
197    }
198
199    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200        vec![]
201    }
202
203    fn with_new_children(
204        self: Arc<Self>,
205        _: Vec<Arc<dyn ExecutionPlan>>,
206    ) -> Result<Arc<dyn ExecutionPlan>> {
207        unimplemented!()
208    }
209
210    /// Returns a stream which yields data
211    fn execute(
212        &self,
213        partition: usize,
214        _context: Arc<TaskContext>,
215    ) -> Result<SendableRecordBatchStream> {
216        assert_eq!(partition, 0);
217
218        // Result doesn't implement clone, so do it ourself
219        let data: Vec<_> = self
220            .data
221            .iter()
222            .map(|r| match r {
223                Ok(batch) => Ok(batch.clone()),
224                Err(e) => Err(clone_error(e)),
225            })
226            .collect();
227
228        if self.use_task {
229            let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
230            // send data in order but in a separate task (to ensure
231            // the batches are not available without the stream
232            // yielding).
233            let tx = builder.tx();
234            builder.spawn(async move {
235                for batch in data {
236                    println!("Sending batch via delayed stream");
237                    if let Err(e) = tx.send(batch).await {
238                        println!("ERROR batch via delayed stream: {e}");
239                    }
240                }
241
242                Ok(())
243            });
244            // returned stream simply reads off the rx stream
245            Ok(builder.build())
246        } else {
247            // make an input that will error
248            let stream = futures::stream::iter(data);
249            Ok(Box::pin(RecordBatchStreamAdapter::new(
250                self.schema(),
251                stream,
252            )))
253        }
254    }
255
256    // Panics if one of the batches is an error
257    fn statistics(&self) -> Result<Statistics> {
258        self.partition_statistics(None)
259    }
260
261    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
262        if partition.is_some() {
263            return Ok(Statistics::new_unknown(&self.schema));
264        }
265        let data: Result<Vec<_>> = self
266            .data
267            .iter()
268            .map(|r| match r {
269                Ok(batch) => Ok(batch.clone()),
270                Err(e) => Err(clone_error(e)),
271            })
272            .collect();
273
274        let data = data?;
275
276        Ok(common::compute_record_batch_statistics(
277            &[data],
278            &self.schema,
279            None,
280        ))
281    }
282}
283
284fn clone_error(e: &DataFusionError) -> DataFusionError {
285    use DataFusionError::*;
286    match e {
287        Execution(msg) => Execution(msg.to_string()),
288        _ => unimplemented!(),
289    }
290}
291
292/// A Mock ExecutionPlan that does not start producing input until a
293/// barrier is called
294///
295#[derive(Debug)]
296pub struct BarrierExec {
297    /// partitions to send back
298    data: Vec<Vec<RecordBatch>>,
299    schema: SchemaRef,
300
301    /// all streams wait on this barrier to produce
302    barrier: Arc<Barrier>,
303    cache: PlanProperties,
304}
305
306impl BarrierExec {
307    /// Create a new exec with some number of partitions.
308    pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
309        // wait for all streams and the input
310        let barrier = Arc::new(Barrier::new(data.len() + 1));
311        let cache = Self::compute_properties(Arc::clone(&schema), &data);
312        Self {
313            data,
314            schema,
315            barrier,
316            cache,
317        }
318    }
319
320    /// wait until all the input streams and this function is ready
321    pub async fn wait(&self) {
322        println!("BarrierExec::wait waiting on barrier");
323        self.barrier.wait().await;
324        println!("BarrierExec::wait done waiting");
325    }
326
327    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
328    fn compute_properties(
329        schema: SchemaRef,
330        data: &[Vec<RecordBatch>],
331    ) -> PlanProperties {
332        PlanProperties::new(
333            EquivalenceProperties::new(schema),
334            Partitioning::UnknownPartitioning(data.len()),
335            EmissionType::Incremental,
336            Boundedness::Bounded,
337        )
338    }
339}
340
341impl DisplayAs for BarrierExec {
342    fn fmt_as(
343        &self,
344        t: DisplayFormatType,
345        f: &mut std::fmt::Formatter,
346    ) -> std::fmt::Result {
347        match t {
348            DisplayFormatType::Default | DisplayFormatType::Verbose => {
349                write!(f, "BarrierExec")
350            }
351            DisplayFormatType::TreeRender => {
352                // TODO: collect info
353                write!(f, "")
354            }
355        }
356    }
357}
358
359impl ExecutionPlan for BarrierExec {
360    fn name(&self) -> &'static str {
361        Self::static_name()
362    }
363
364    fn as_any(&self) -> &dyn Any {
365        self
366    }
367
368    fn properties(&self) -> &PlanProperties {
369        &self.cache
370    }
371
372    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
373        unimplemented!()
374    }
375
376    fn with_new_children(
377        self: Arc<Self>,
378        _: Vec<Arc<dyn ExecutionPlan>>,
379    ) -> Result<Arc<dyn ExecutionPlan>> {
380        unimplemented!()
381    }
382
383    /// Returns a stream which yields data
384    fn execute(
385        &self,
386        partition: usize,
387        _context: Arc<TaskContext>,
388    ) -> Result<SendableRecordBatchStream> {
389        assert!(partition < self.data.len());
390
391        let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
392
393        // task simply sends data in order after barrier is reached
394        let data = self.data[partition].clone();
395        let b = Arc::clone(&self.barrier);
396        let tx = builder.tx();
397        builder.spawn(async move {
398            println!("Partition {partition} waiting on barrier");
399            b.wait().await;
400            for batch in data {
401                println!("Partition {partition} sending batch");
402                if let Err(e) = tx.send(Ok(batch)).await {
403                    println!("ERROR batch via barrier stream stream: {e}");
404                }
405            }
406
407            Ok(())
408        });
409
410        // returned stream simply reads off the rx stream
411        Ok(builder.build())
412    }
413
414    fn statistics(&self) -> Result<Statistics> {
415        self.partition_statistics(None)
416    }
417
418    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
419        if partition.is_some() {
420            return Ok(Statistics::new_unknown(&self.schema));
421        }
422        Ok(common::compute_record_batch_statistics(
423            &self.data,
424            &self.schema,
425            None,
426        ))
427    }
428}
429
430/// A mock execution plan that errors on a call to execute
431#[derive(Debug)]
432pub struct ErrorExec {
433    cache: PlanProperties,
434}
435
436impl Default for ErrorExec {
437    fn default() -> Self {
438        Self::new()
439    }
440}
441
442impl ErrorExec {
443    pub fn new() -> Self {
444        let schema = Arc::new(Schema::new(vec![Field::new(
445            "dummy",
446            DataType::Int64,
447            true,
448        )]));
449        let cache = Self::compute_properties(schema);
450        Self { cache }
451    }
452
453    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
454    fn compute_properties(schema: SchemaRef) -> PlanProperties {
455        PlanProperties::new(
456            EquivalenceProperties::new(schema),
457            Partitioning::UnknownPartitioning(1),
458            EmissionType::Incremental,
459            Boundedness::Bounded,
460        )
461    }
462}
463
464impl DisplayAs for ErrorExec {
465    fn fmt_as(
466        &self,
467        t: DisplayFormatType,
468        f: &mut std::fmt::Formatter,
469    ) -> std::fmt::Result {
470        match t {
471            DisplayFormatType::Default | DisplayFormatType::Verbose => {
472                write!(f, "ErrorExec")
473            }
474            DisplayFormatType::TreeRender => {
475                // TODO: collect info
476                write!(f, "")
477            }
478        }
479    }
480}
481
482impl ExecutionPlan for ErrorExec {
483    fn name(&self) -> &'static str {
484        Self::static_name()
485    }
486
487    fn as_any(&self) -> &dyn Any {
488        self
489    }
490
491    fn properties(&self) -> &PlanProperties {
492        &self.cache
493    }
494
495    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
496        unimplemented!()
497    }
498
499    fn with_new_children(
500        self: Arc<Self>,
501        _: Vec<Arc<dyn ExecutionPlan>>,
502    ) -> Result<Arc<dyn ExecutionPlan>> {
503        unimplemented!()
504    }
505
506    /// Returns a stream which yields data
507    fn execute(
508        &self,
509        partition: usize,
510        _context: Arc<TaskContext>,
511    ) -> Result<SendableRecordBatchStream> {
512        internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
513    }
514}
515
516/// A mock execution plan that simply returns the provided statistics
517#[derive(Debug, Clone)]
518pub struct StatisticsExec {
519    stats: Statistics,
520    schema: Arc<Schema>,
521    cache: PlanProperties,
522}
523impl StatisticsExec {
524    pub fn new(stats: Statistics, schema: Schema) -> Self {
525        assert_eq!(
526            stats
527                .column_statistics.len(), schema.fields().len(),
528            "if defined, the column statistics vector length should be the number of fields"
529        );
530        let cache = Self::compute_properties(Arc::new(schema.clone()));
531        Self {
532            stats,
533            schema: Arc::new(schema),
534            cache,
535        }
536    }
537
538    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
539    fn compute_properties(schema: SchemaRef) -> PlanProperties {
540        PlanProperties::new(
541            EquivalenceProperties::new(schema),
542            Partitioning::UnknownPartitioning(2),
543            EmissionType::Incremental,
544            Boundedness::Bounded,
545        )
546    }
547}
548
549impl DisplayAs for StatisticsExec {
550    fn fmt_as(
551        &self,
552        t: DisplayFormatType,
553        f: &mut std::fmt::Formatter,
554    ) -> std::fmt::Result {
555        match t {
556            DisplayFormatType::Default | DisplayFormatType::Verbose => {
557                write!(
558                    f,
559                    "StatisticsExec: col_count={}, row_count={:?}",
560                    self.schema.fields().len(),
561                    self.stats.num_rows,
562                )
563            }
564            DisplayFormatType::TreeRender => {
565                // TODO: collect info
566                write!(f, "")
567            }
568        }
569    }
570}
571
572impl ExecutionPlan for StatisticsExec {
573    fn name(&self) -> &'static str {
574        Self::static_name()
575    }
576
577    fn as_any(&self) -> &dyn Any {
578        self
579    }
580
581    fn properties(&self) -> &PlanProperties {
582        &self.cache
583    }
584
585    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
586        vec![]
587    }
588
589    fn with_new_children(
590        self: Arc<Self>,
591        _: Vec<Arc<dyn ExecutionPlan>>,
592    ) -> Result<Arc<dyn ExecutionPlan>> {
593        Ok(self)
594    }
595
596    fn execute(
597        &self,
598        _partition: usize,
599        _context: Arc<TaskContext>,
600    ) -> Result<SendableRecordBatchStream> {
601        unimplemented!("This plan only serves for testing statistics")
602    }
603
604    fn statistics(&self) -> Result<Statistics> {
605        Ok(self.stats.clone())
606    }
607
608    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
609        Ok(if partition.is_some() {
610            Statistics::new_unknown(&self.schema)
611        } else {
612            self.stats.clone()
613        })
614    }
615}
616
617/// Execution plan that emits streams that block forever.
618///
619/// This is useful to test shutdown / cancellation behavior of certain execution plans.
620#[derive(Debug)]
621pub struct BlockingExec {
622    /// Schema that is mocked by this plan.
623    schema: SchemaRef,
624
625    /// Ref-counting helper to check if the plan and the produced stream are still in memory.
626    refs: Arc<()>,
627    cache: PlanProperties,
628}
629
630impl BlockingExec {
631    /// Create new [`BlockingExec`] with a give schema and number of partitions.
632    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
633        let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
634        Self {
635            schema,
636            refs: Default::default(),
637            cache,
638        }
639    }
640
641    /// Weak pointer that can be used for ref-counting this execution plan and its streams.
642    ///
643    /// Use [`Weak::strong_count`] to determine if the plan itself and its streams are dropped (should be 0 in that
644    /// case). Note that tokio might take some time to cancel spawned tasks, so you need to wrap this check into a retry
645    /// loop. Use [`assert_strong_count_converges_to_zero`] to archive this.
646    pub fn refs(&self) -> Weak<()> {
647        Arc::downgrade(&self.refs)
648    }
649
650    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
651    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
652        PlanProperties::new(
653            EquivalenceProperties::new(schema),
654            Partitioning::UnknownPartitioning(n_partitions),
655            EmissionType::Incremental,
656            Boundedness::Bounded,
657        )
658    }
659}
660
661impl DisplayAs for BlockingExec {
662    fn fmt_as(
663        &self,
664        t: DisplayFormatType,
665        f: &mut std::fmt::Formatter,
666    ) -> std::fmt::Result {
667        match t {
668            DisplayFormatType::Default | DisplayFormatType::Verbose => {
669                write!(f, "BlockingExec",)
670            }
671            DisplayFormatType::TreeRender => {
672                // TODO: collect info
673                write!(f, "")
674            }
675        }
676    }
677}
678
679impl ExecutionPlan for BlockingExec {
680    fn name(&self) -> &'static str {
681        Self::static_name()
682    }
683
684    fn as_any(&self) -> &dyn Any {
685        self
686    }
687
688    fn properties(&self) -> &PlanProperties {
689        &self.cache
690    }
691
692    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
693        // this is a leaf node and has no children
694        vec![]
695    }
696
697    fn with_new_children(
698        self: Arc<Self>,
699        _: Vec<Arc<dyn ExecutionPlan>>,
700    ) -> Result<Arc<dyn ExecutionPlan>> {
701        internal_err!("Children cannot be replaced in {self:?}")
702    }
703
704    fn execute(
705        &self,
706        _partition: usize,
707        _context: Arc<TaskContext>,
708    ) -> Result<SendableRecordBatchStream> {
709        Ok(Box::pin(BlockingStream {
710            schema: Arc::clone(&self.schema),
711            _refs: Arc::clone(&self.refs),
712        }))
713    }
714}
715
716/// A [`RecordBatchStream`] that is pending forever.
717#[derive(Debug)]
718pub struct BlockingStream {
719    /// Schema mocked by this stream.
720    schema: SchemaRef,
721
722    /// Ref-counting helper to check if the stream are still in memory.
723    _refs: Arc<()>,
724}
725
726impl Stream for BlockingStream {
727    type Item = Result<RecordBatch>;
728
729    fn poll_next(
730        self: Pin<&mut Self>,
731        _cx: &mut Context<'_>,
732    ) -> Poll<Option<Self::Item>> {
733        Poll::Pending
734    }
735}
736
737impl RecordBatchStream for BlockingStream {
738    fn schema(&self) -> SchemaRef {
739        Arc::clone(&self.schema)
740    }
741}
742
743/// Asserts that the strong count of the given [`Weak`] pointer converges to zero.
744///
745/// This might take a while but has a timeout.
746pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
747    tokio::time::timeout(std::time::Duration::from_secs(10), async {
748        loop {
749            if dbg!(Weak::strong_count(&refs)) == 0 {
750                break;
751            }
752            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
753        }
754    })
755    .await
756    .unwrap();
757}
758
759/// Execution plan that emits streams that panics.
760///
761/// This is useful to test panic handling of certain execution plans.
762#[derive(Debug)]
763pub struct PanicExec {
764    /// Schema that is mocked by this plan.
765    schema: SchemaRef,
766
767    /// Number of output partitions. Each partition will produce this
768    /// many empty output record batches prior to panicking
769    batches_until_panics: Vec<usize>,
770    cache: PlanProperties,
771}
772
773impl PanicExec {
774    /// Create new [`PanicExec`] with a give schema and number of
775    /// partitions, which will each panic immediately.
776    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
777        let batches_until_panics = vec![0; n_partitions];
778        let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
779        Self {
780            schema,
781            batches_until_panics,
782            cache,
783        }
784    }
785
786    /// Set the number of batches prior to panic for a partition
787    pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
788        self.batches_until_panics[partition] = count;
789        self
790    }
791
792    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
793    fn compute_properties(
794        schema: SchemaRef,
795        batches_until_panics: &[usize],
796    ) -> PlanProperties {
797        let num_partitions = batches_until_panics.len();
798        PlanProperties::new(
799            EquivalenceProperties::new(schema),
800            Partitioning::UnknownPartitioning(num_partitions),
801            EmissionType::Incremental,
802            Boundedness::Bounded,
803        )
804    }
805}
806
807impl DisplayAs for PanicExec {
808    fn fmt_as(
809        &self,
810        t: DisplayFormatType,
811        f: &mut std::fmt::Formatter,
812    ) -> std::fmt::Result {
813        match t {
814            DisplayFormatType::Default | DisplayFormatType::Verbose => {
815                write!(f, "PanicExec",)
816            }
817            DisplayFormatType::TreeRender => {
818                // TODO: collect info
819                write!(f, "")
820            }
821        }
822    }
823}
824
825impl ExecutionPlan for PanicExec {
826    fn name(&self) -> &'static str {
827        Self::static_name()
828    }
829
830    fn as_any(&self) -> &dyn Any {
831        self
832    }
833
834    fn properties(&self) -> &PlanProperties {
835        &self.cache
836    }
837
838    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
839        // this is a leaf node and has no children
840        vec![]
841    }
842
843    fn with_new_children(
844        self: Arc<Self>,
845        _: Vec<Arc<dyn ExecutionPlan>>,
846    ) -> Result<Arc<dyn ExecutionPlan>> {
847        internal_err!("Children cannot be replaced in {:?}", self)
848    }
849
850    fn execute(
851        &self,
852        partition: usize,
853        _context: Arc<TaskContext>,
854    ) -> Result<SendableRecordBatchStream> {
855        Ok(Box::pin(PanicStream {
856            partition,
857            batches_until_panic: self.batches_until_panics[partition],
858            schema: Arc::clone(&self.schema),
859            ready: false,
860        }))
861    }
862}
863
864/// A [`RecordBatchStream`] that yields every other batch and panics
865/// after `batches_until_panic` batches have been produced.
866///
867/// Useful for testing the behavior of streams on panic
868#[derive(Debug)]
869struct PanicStream {
870    /// Which partition was this
871    partition: usize,
872    /// How may batches will be produced until panic
873    batches_until_panic: usize,
874    /// Schema mocked by this stream.
875    schema: SchemaRef,
876    /// Should we return ready ?
877    ready: bool,
878}
879
880impl Stream for PanicStream {
881    type Item = Result<RecordBatch>;
882
883    fn poll_next(
884        mut self: Pin<&mut Self>,
885        cx: &mut Context<'_>,
886    ) -> Poll<Option<Self::Item>> {
887        if self.batches_until_panic > 0 {
888            if self.ready {
889                self.batches_until_panic -= 1;
890                self.ready = false;
891                let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
892                return Poll::Ready(Some(Ok(batch)));
893            } else {
894                self.ready = true;
895                // get called again
896                cx.waker().wake_by_ref();
897                return Poll::Pending;
898            }
899        }
900        panic!("PanickingStream did panic: {}", self.partition)
901    }
902}
903
904impl RecordBatchStream for PanicStream {
905    fn schema(&self) -> SchemaRef {
906        Arc::clone(&self.schema)
907    }
908}