datafusion_physical_plan/test/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Simple iterator over batches for use in testing
19
20use std::{
21    any::Any,
22    pin::Pin,
23    sync::{Arc, Weak},
24    task::{Context, Poll},
25};
26
27use crate::{
28    common, execution_plan::Boundedness, DisplayAs, DisplayFormatType, ExecutionPlan,
29    Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
30    Statistics,
31};
32use crate::{
33    execution_plan::EmissionType,
34    stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
35};
36
37use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38use arrow::record_batch::RecordBatch;
39use datafusion_common::{internal_err, DataFusionError, Result};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::EquivalenceProperties;
42
43use futures::Stream;
44use tokio::sync::Barrier;
45
46/// Index into the data that has been returned so far
47#[derive(Debug, Default, Clone)]
48pub struct BatchIndex {
49    inner: Arc<std::sync::Mutex<usize>>,
50}
51
52impl BatchIndex {
53    /// Return the current index
54    pub fn value(&self) -> usize {
55        let inner = self.inner.lock().unwrap();
56        *inner
57    }
58
59    // increment the current index by one
60    pub fn incr(&self) {
61        let mut inner = self.inner.lock().unwrap();
62        *inner += 1;
63    }
64}
65
66/// Iterator over batches
67#[derive(Debug, Default)]
68pub struct TestStream {
69    /// Vector of record batches
70    data: Vec<RecordBatch>,
71    /// Index into the data that has been returned so far
72    index: BatchIndex,
73}
74
75impl TestStream {
76    /// Create an iterator for a vector of record batches. Assumes at
77    /// least one entry in data (for the schema)
78    pub fn new(data: Vec<RecordBatch>) -> Self {
79        Self {
80            data,
81            ..Default::default()
82        }
83    }
84
85    /// Return a handle to the index counter for this stream
86    pub fn index(&self) -> BatchIndex {
87        self.index.clone()
88    }
89}
90
91impl Stream for TestStream {
92    type Item = Result<RecordBatch>;
93
94    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95        let next_batch = self.index.value();
96
97        Poll::Ready(if next_batch < self.data.len() {
98            let next_batch = self.index.value();
99            self.index.incr();
100            Some(Ok(self.data[next_batch].clone()))
101        } else {
102            None
103        })
104    }
105
106    fn size_hint(&self) -> (usize, Option<usize>) {
107        (self.data.len(), Some(self.data.len()))
108    }
109}
110
111impl RecordBatchStream for TestStream {
112    /// Get the schema
113    fn schema(&self) -> SchemaRef {
114        self.data[0].schema()
115    }
116}
117
118/// A Mock ExecutionPlan that can be used for writing tests of other
119/// ExecutionPlans
120#[derive(Debug)]
121pub struct MockExec {
122    /// the results to send back
123    data: Vec<Result<RecordBatch>>,
124    schema: SchemaRef,
125    /// if true (the default), sends data using a separate task to ensure the
126    /// batches are not available without this stream yielding first
127    use_task: bool,
128    cache: PlanProperties,
129}
130
131impl MockExec {
132    /// Create a new `MockExec` with a single partition that returns
133    /// the specified `Results`s.
134    ///
135    /// By default, the batches are not produced immediately (the
136    /// caller has to actually yield and another task must run) to
137    /// ensure any poll loops are correct. This behavior can be
138    /// changed with `with_use_task`
139    pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
140        let cache = Self::compute_properties(Arc::clone(&schema));
141        Self {
142            data,
143            schema,
144            use_task: true,
145            cache,
146        }
147    }
148
149    /// If `use_task` is true (the default) then the batches are sent
150    /// back using a separate task to ensure the underlying stream is
151    /// not immediately ready
152    pub fn with_use_task(mut self, use_task: bool) -> Self {
153        self.use_task = use_task;
154        self
155    }
156
157    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
158    fn compute_properties(schema: SchemaRef) -> PlanProperties {
159        PlanProperties::new(
160            EquivalenceProperties::new(schema),
161            Partitioning::UnknownPartitioning(1),
162            EmissionType::Incremental,
163            Boundedness::Bounded,
164        )
165    }
166}
167
168impl DisplayAs for MockExec {
169    fn fmt_as(
170        &self,
171        t: DisplayFormatType,
172        f: &mut std::fmt::Formatter,
173    ) -> std::fmt::Result {
174        match t {
175            DisplayFormatType::Default | DisplayFormatType::Verbose => {
176                write!(f, "MockExec")
177            }
178            DisplayFormatType::TreeRender => {
179                // TODO: collect info
180                write!(f, "")
181            }
182        }
183    }
184}
185
186impl ExecutionPlan for MockExec {
187    fn name(&self) -> &'static str {
188        Self::static_name()
189    }
190
191    fn as_any(&self) -> &dyn Any {
192        self
193    }
194
195    fn properties(&self) -> &PlanProperties {
196        &self.cache
197    }
198
199    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200        vec![]
201    }
202
203    fn with_new_children(
204        self: Arc<Self>,
205        _: Vec<Arc<dyn ExecutionPlan>>,
206    ) -> Result<Arc<dyn ExecutionPlan>> {
207        unimplemented!()
208    }
209
210    /// Returns a stream which yields data
211    fn execute(
212        &self,
213        partition: usize,
214        _context: Arc<TaskContext>,
215    ) -> Result<SendableRecordBatchStream> {
216        assert_eq!(partition, 0);
217
218        // Result doesn't implement clone, so do it ourself
219        let data: Vec<_> = self
220            .data
221            .iter()
222            .map(|r| match r {
223                Ok(batch) => Ok(batch.clone()),
224                Err(e) => Err(clone_error(e)),
225            })
226            .collect();
227
228        if self.use_task {
229            let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
230            // send data in order but in a separate task (to ensure
231            // the batches are not available without the stream
232            // yielding).
233            let tx = builder.tx();
234            builder.spawn(async move {
235                for batch in data {
236                    println!("Sending batch via delayed stream");
237                    if let Err(e) = tx.send(batch).await {
238                        println!("ERROR batch via delayed stream: {e}");
239                    }
240                }
241
242                Ok(())
243            });
244            // returned stream simply reads off the rx stream
245            Ok(builder.build())
246        } else {
247            // make an input that will error
248            let stream = futures::stream::iter(data);
249            Ok(Box::pin(RecordBatchStreamAdapter::new(
250                self.schema(),
251                stream,
252            )))
253        }
254    }
255
256    // Panics if one of the batches is an error
257    fn statistics(&self) -> Result<Statistics> {
258        self.partition_statistics(None)
259    }
260
261    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
262        if partition.is_some() {
263            return Ok(Statistics::new_unknown(&self.schema));
264        }
265        let data: Result<Vec<_>> = self
266            .data
267            .iter()
268            .map(|r| match r {
269                Ok(batch) => Ok(batch.clone()),
270                Err(e) => Err(clone_error(e)),
271            })
272            .collect();
273
274        let data = data?;
275
276        Ok(common::compute_record_batch_statistics(
277            &[data],
278            &self.schema,
279            None,
280        ))
281    }
282}
283
284fn clone_error(e: &DataFusionError) -> DataFusionError {
285    use DataFusionError::*;
286    match e {
287        Execution(msg) => Execution(msg.to_string()),
288        _ => unimplemented!(),
289    }
290}
291
292/// A Mock ExecutionPlan that does not start producing input until a
293/// barrier is called
294#[derive(Debug)]
295pub struct BarrierExec {
296    /// partitions to send back
297    data: Vec<Vec<RecordBatch>>,
298    schema: SchemaRef,
299
300    /// all streams wait on this barrier to produce
301    barrier: Arc<Barrier>,
302    cache: PlanProperties,
303}
304
305impl BarrierExec {
306    /// Create a new exec with some number of partitions.
307    pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
308        // wait for all streams and the input
309        let barrier = Arc::new(Barrier::new(data.len() + 1));
310        let cache = Self::compute_properties(Arc::clone(&schema), &data);
311        Self {
312            data,
313            schema,
314            barrier,
315            cache,
316        }
317    }
318
319    /// wait until all the input streams and this function is ready
320    pub async fn wait(&self) {
321        println!("BarrierExec::wait waiting on barrier");
322        self.barrier.wait().await;
323        println!("BarrierExec::wait done waiting");
324    }
325
326    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
327    fn compute_properties(
328        schema: SchemaRef,
329        data: &[Vec<RecordBatch>],
330    ) -> PlanProperties {
331        PlanProperties::new(
332            EquivalenceProperties::new(schema),
333            Partitioning::UnknownPartitioning(data.len()),
334            EmissionType::Incremental,
335            Boundedness::Bounded,
336        )
337    }
338}
339
340impl DisplayAs for BarrierExec {
341    fn fmt_as(
342        &self,
343        t: DisplayFormatType,
344        f: &mut std::fmt::Formatter,
345    ) -> std::fmt::Result {
346        match t {
347            DisplayFormatType::Default | DisplayFormatType::Verbose => {
348                write!(f, "BarrierExec")
349            }
350            DisplayFormatType::TreeRender => {
351                // TODO: collect info
352                write!(f, "")
353            }
354        }
355    }
356}
357
358impl ExecutionPlan for BarrierExec {
359    fn name(&self) -> &'static str {
360        Self::static_name()
361    }
362
363    fn as_any(&self) -> &dyn Any {
364        self
365    }
366
367    fn properties(&self) -> &PlanProperties {
368        &self.cache
369    }
370
371    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
372        unimplemented!()
373    }
374
375    fn with_new_children(
376        self: Arc<Self>,
377        _: Vec<Arc<dyn ExecutionPlan>>,
378    ) -> Result<Arc<dyn ExecutionPlan>> {
379        unimplemented!()
380    }
381
382    /// Returns a stream which yields data
383    fn execute(
384        &self,
385        partition: usize,
386        _context: Arc<TaskContext>,
387    ) -> Result<SendableRecordBatchStream> {
388        assert!(partition < self.data.len());
389
390        let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
391
392        // task simply sends data in order after barrier is reached
393        let data = self.data[partition].clone();
394        let b = Arc::clone(&self.barrier);
395        let tx = builder.tx();
396        builder.spawn(async move {
397            println!("Partition {partition} waiting on barrier");
398            b.wait().await;
399            for batch in data {
400                println!("Partition {partition} sending batch");
401                if let Err(e) = tx.send(Ok(batch)).await {
402                    println!("ERROR batch via barrier stream stream: {e}");
403                }
404            }
405
406            Ok(())
407        });
408
409        // returned stream simply reads off the rx stream
410        Ok(builder.build())
411    }
412
413    fn statistics(&self) -> Result<Statistics> {
414        self.partition_statistics(None)
415    }
416
417    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
418        if partition.is_some() {
419            return Ok(Statistics::new_unknown(&self.schema));
420        }
421        Ok(common::compute_record_batch_statistics(
422            &self.data,
423            &self.schema,
424            None,
425        ))
426    }
427}
428
429/// A mock execution plan that errors on a call to execute
430#[derive(Debug)]
431pub struct ErrorExec {
432    cache: PlanProperties,
433}
434
435impl Default for ErrorExec {
436    fn default() -> Self {
437        Self::new()
438    }
439}
440
441impl ErrorExec {
442    pub fn new() -> Self {
443        let schema = Arc::new(Schema::new(vec![Field::new(
444            "dummy",
445            DataType::Int64,
446            true,
447        )]));
448        let cache = Self::compute_properties(schema);
449        Self { cache }
450    }
451
452    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
453    fn compute_properties(schema: SchemaRef) -> PlanProperties {
454        PlanProperties::new(
455            EquivalenceProperties::new(schema),
456            Partitioning::UnknownPartitioning(1),
457            EmissionType::Incremental,
458            Boundedness::Bounded,
459        )
460    }
461}
462
463impl DisplayAs for ErrorExec {
464    fn fmt_as(
465        &self,
466        t: DisplayFormatType,
467        f: &mut std::fmt::Formatter,
468    ) -> std::fmt::Result {
469        match t {
470            DisplayFormatType::Default | DisplayFormatType::Verbose => {
471                write!(f, "ErrorExec")
472            }
473            DisplayFormatType::TreeRender => {
474                // TODO: collect info
475                write!(f, "")
476            }
477        }
478    }
479}
480
481impl ExecutionPlan for ErrorExec {
482    fn name(&self) -> &'static str {
483        Self::static_name()
484    }
485
486    fn as_any(&self) -> &dyn Any {
487        self
488    }
489
490    fn properties(&self) -> &PlanProperties {
491        &self.cache
492    }
493
494    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
495        unimplemented!()
496    }
497
498    fn with_new_children(
499        self: Arc<Self>,
500        _: Vec<Arc<dyn ExecutionPlan>>,
501    ) -> Result<Arc<dyn ExecutionPlan>> {
502        unimplemented!()
503    }
504
505    /// Returns a stream which yields data
506    fn execute(
507        &self,
508        partition: usize,
509        _context: Arc<TaskContext>,
510    ) -> Result<SendableRecordBatchStream> {
511        internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
512    }
513}
514
515/// A mock execution plan that simply returns the provided statistics
516#[derive(Debug, Clone)]
517pub struct StatisticsExec {
518    stats: Statistics,
519    schema: Arc<Schema>,
520    cache: PlanProperties,
521}
522impl StatisticsExec {
523    pub fn new(stats: Statistics, schema: Schema) -> Self {
524        assert_eq!(
525            stats
526                .column_statistics.len(), schema.fields().len(),
527            "if defined, the column statistics vector length should be the number of fields"
528        );
529        let cache = Self::compute_properties(Arc::new(schema.clone()));
530        Self {
531            stats,
532            schema: Arc::new(schema),
533            cache,
534        }
535    }
536
537    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
538    fn compute_properties(schema: SchemaRef) -> PlanProperties {
539        PlanProperties::new(
540            EquivalenceProperties::new(schema),
541            Partitioning::UnknownPartitioning(2),
542            EmissionType::Incremental,
543            Boundedness::Bounded,
544        )
545    }
546}
547
548impl DisplayAs for StatisticsExec {
549    fn fmt_as(
550        &self,
551        t: DisplayFormatType,
552        f: &mut std::fmt::Formatter,
553    ) -> std::fmt::Result {
554        match t {
555            DisplayFormatType::Default | DisplayFormatType::Verbose => {
556                write!(
557                    f,
558                    "StatisticsExec: col_count={}, row_count={:?}",
559                    self.schema.fields().len(),
560                    self.stats.num_rows,
561                )
562            }
563            DisplayFormatType::TreeRender => {
564                // TODO: collect info
565                write!(f, "")
566            }
567        }
568    }
569}
570
571impl ExecutionPlan for StatisticsExec {
572    fn name(&self) -> &'static str {
573        Self::static_name()
574    }
575
576    fn as_any(&self) -> &dyn Any {
577        self
578    }
579
580    fn properties(&self) -> &PlanProperties {
581        &self.cache
582    }
583
584    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
585        vec![]
586    }
587
588    fn with_new_children(
589        self: Arc<Self>,
590        _: Vec<Arc<dyn ExecutionPlan>>,
591    ) -> Result<Arc<dyn ExecutionPlan>> {
592        Ok(self)
593    }
594
595    fn execute(
596        &self,
597        _partition: usize,
598        _context: Arc<TaskContext>,
599    ) -> Result<SendableRecordBatchStream> {
600        unimplemented!("This plan only serves for testing statistics")
601    }
602
603    fn statistics(&self) -> Result<Statistics> {
604        Ok(self.stats.clone())
605    }
606
607    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
608        Ok(if partition.is_some() {
609            Statistics::new_unknown(&self.schema)
610        } else {
611            self.stats.clone()
612        })
613    }
614}
615
616/// Execution plan that emits streams that block forever.
617///
618/// This is useful to test shutdown / cancellation behavior of certain execution plans.
619#[derive(Debug)]
620pub struct BlockingExec {
621    /// Schema that is mocked by this plan.
622    schema: SchemaRef,
623
624    /// Ref-counting helper to check if the plan and the produced stream are still in memory.
625    refs: Arc<()>,
626    cache: PlanProperties,
627}
628
629impl BlockingExec {
630    /// Create new [`BlockingExec`] with a give schema and number of partitions.
631    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
632        let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
633        Self {
634            schema,
635            refs: Default::default(),
636            cache,
637        }
638    }
639
640    /// Weak pointer that can be used for ref-counting this execution plan and its streams.
641    ///
642    /// Use [`Weak::strong_count`] to determine if the plan itself and its streams are dropped (should be 0 in that
643    /// case). Note that tokio might take some time to cancel spawned tasks, so you need to wrap this check into a retry
644    /// loop. Use [`assert_strong_count_converges_to_zero`] to archive this.
645    pub fn refs(&self) -> Weak<()> {
646        Arc::downgrade(&self.refs)
647    }
648
649    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
650    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
651        PlanProperties::new(
652            EquivalenceProperties::new(schema),
653            Partitioning::UnknownPartitioning(n_partitions),
654            EmissionType::Incremental,
655            Boundedness::Bounded,
656        )
657    }
658}
659
660impl DisplayAs for BlockingExec {
661    fn fmt_as(
662        &self,
663        t: DisplayFormatType,
664        f: &mut std::fmt::Formatter,
665    ) -> std::fmt::Result {
666        match t {
667            DisplayFormatType::Default | DisplayFormatType::Verbose => {
668                write!(f, "BlockingExec",)
669            }
670            DisplayFormatType::TreeRender => {
671                // TODO: collect info
672                write!(f, "")
673            }
674        }
675    }
676}
677
678impl ExecutionPlan for BlockingExec {
679    fn name(&self) -> &'static str {
680        Self::static_name()
681    }
682
683    fn as_any(&self) -> &dyn Any {
684        self
685    }
686
687    fn properties(&self) -> &PlanProperties {
688        &self.cache
689    }
690
691    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
692        // this is a leaf node and has no children
693        vec![]
694    }
695
696    fn with_new_children(
697        self: Arc<Self>,
698        _: Vec<Arc<dyn ExecutionPlan>>,
699    ) -> Result<Arc<dyn ExecutionPlan>> {
700        internal_err!("Children cannot be replaced in {self:?}")
701    }
702
703    fn execute(
704        &self,
705        _partition: usize,
706        _context: Arc<TaskContext>,
707    ) -> Result<SendableRecordBatchStream> {
708        Ok(Box::pin(BlockingStream {
709            schema: Arc::clone(&self.schema),
710            _refs: Arc::clone(&self.refs),
711        }))
712    }
713}
714
715/// A [`RecordBatchStream`] that is pending forever.
716#[derive(Debug)]
717pub struct BlockingStream {
718    /// Schema mocked by this stream.
719    schema: SchemaRef,
720
721    /// Ref-counting helper to check if the stream are still in memory.
722    _refs: Arc<()>,
723}
724
725impl Stream for BlockingStream {
726    type Item = Result<RecordBatch>;
727
728    fn poll_next(
729        self: Pin<&mut Self>,
730        _cx: &mut Context<'_>,
731    ) -> Poll<Option<Self::Item>> {
732        Poll::Pending
733    }
734}
735
736impl RecordBatchStream for BlockingStream {
737    fn schema(&self) -> SchemaRef {
738        Arc::clone(&self.schema)
739    }
740}
741
742/// Asserts that the strong count of the given [`Weak`] pointer converges to zero.
743///
744/// This might take a while but has a timeout.
745pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
746    tokio::time::timeout(std::time::Duration::from_secs(10), async {
747        loop {
748            if dbg!(Weak::strong_count(&refs)) == 0 {
749                break;
750            }
751            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
752        }
753    })
754    .await
755    .unwrap();
756}
757
758/// Execution plan that emits streams that panics.
759///
760/// This is useful to test panic handling of certain execution plans.
761#[derive(Debug)]
762pub struct PanicExec {
763    /// Schema that is mocked by this plan.
764    schema: SchemaRef,
765
766    /// Number of output partitions. Each partition will produce this
767    /// many empty output record batches prior to panicking
768    batches_until_panics: Vec<usize>,
769    cache: PlanProperties,
770}
771
772impl PanicExec {
773    /// Create new [`PanicExec`] with a give schema and number of
774    /// partitions, which will each panic immediately.
775    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
776        let batches_until_panics = vec![0; n_partitions];
777        let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
778        Self {
779            schema,
780            batches_until_panics,
781            cache,
782        }
783    }
784
785    /// Set the number of batches prior to panic for a partition
786    pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
787        self.batches_until_panics[partition] = count;
788        self
789    }
790
791    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
792    fn compute_properties(
793        schema: SchemaRef,
794        batches_until_panics: &[usize],
795    ) -> PlanProperties {
796        let num_partitions = batches_until_panics.len();
797        PlanProperties::new(
798            EquivalenceProperties::new(schema),
799            Partitioning::UnknownPartitioning(num_partitions),
800            EmissionType::Incremental,
801            Boundedness::Bounded,
802        )
803    }
804}
805
806impl DisplayAs for PanicExec {
807    fn fmt_as(
808        &self,
809        t: DisplayFormatType,
810        f: &mut std::fmt::Formatter,
811    ) -> std::fmt::Result {
812        match t {
813            DisplayFormatType::Default | DisplayFormatType::Verbose => {
814                write!(f, "PanicExec",)
815            }
816            DisplayFormatType::TreeRender => {
817                // TODO: collect info
818                write!(f, "")
819            }
820        }
821    }
822}
823
824impl ExecutionPlan for PanicExec {
825    fn name(&self) -> &'static str {
826        Self::static_name()
827    }
828
829    fn as_any(&self) -> &dyn Any {
830        self
831    }
832
833    fn properties(&self) -> &PlanProperties {
834        &self.cache
835    }
836
837    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
838        // this is a leaf node and has no children
839        vec![]
840    }
841
842    fn with_new_children(
843        self: Arc<Self>,
844        _: Vec<Arc<dyn ExecutionPlan>>,
845    ) -> Result<Arc<dyn ExecutionPlan>> {
846        internal_err!("Children cannot be replaced in {:?}", self)
847    }
848
849    fn execute(
850        &self,
851        partition: usize,
852        _context: Arc<TaskContext>,
853    ) -> Result<SendableRecordBatchStream> {
854        Ok(Box::pin(PanicStream {
855            partition,
856            batches_until_panic: self.batches_until_panics[partition],
857            schema: Arc::clone(&self.schema),
858            ready: false,
859        }))
860    }
861}
862
863/// A [`RecordBatchStream`] that yields every other batch and panics
864/// after `batches_until_panic` batches have been produced.
865///
866/// Useful for testing the behavior of streams on panic
867#[derive(Debug)]
868struct PanicStream {
869    /// Which partition was this
870    partition: usize,
871    /// How may batches will be produced until panic
872    batches_until_panic: usize,
873    /// Schema mocked by this stream.
874    schema: SchemaRef,
875    /// Should we return ready ?
876    ready: bool,
877}
878
879impl Stream for PanicStream {
880    type Item = Result<RecordBatch>;
881
882    fn poll_next(
883        mut self: Pin<&mut Self>,
884        cx: &mut Context<'_>,
885    ) -> Poll<Option<Self::Item>> {
886        if self.batches_until_panic > 0 {
887            if self.ready {
888                self.batches_until_panic -= 1;
889                self.ready = false;
890                let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
891                return Poll::Ready(Some(Ok(batch)));
892            } else {
893                self.ready = true;
894                // get called again
895                cx.waker().wake_by_ref();
896                return Poll::Pending;
897            }
898        }
899        panic!("PanickingStream did panic: {}", self.partition)
900    }
901}
902
903impl RecordBatchStream for PanicStream {
904    fn schema(&self) -> SchemaRef {
905        Arc::clone(&self.schema)
906    }
907}