Skip to main content

datafusion_physical_plan/test/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Simple iterator over batches for use in testing
19
20use crate::{
21    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
22    RecordBatchStream, SendableRecordBatchStream, Statistics, common,
23    execution_plan::Boundedness,
24};
25use crate::{
26    execution_plan::EmissionType,
27    stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
28};
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::{
31    any::Any,
32    pin::Pin,
33    sync::{Arc, Weak},
34    task::{Context, Poll},
35};
36
37use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38use arrow::record_batch::RecordBatch;
39use datafusion_common::{DataFusionError, Result, internal_err};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::EquivalenceProperties;
42
43use futures::Stream;
44use tokio::sync::Barrier;
45
46/// Index into the data that has been returned so far
47#[derive(Debug, Default, Clone)]
48pub struct BatchIndex {
49    inner: Arc<std::sync::Mutex<usize>>,
50}
51
52impl BatchIndex {
53    /// Return the current index
54    pub fn value(&self) -> usize {
55        let inner = self.inner.lock().unwrap();
56        *inner
57    }
58
59    // increment the current index by one
60    pub fn incr(&self) {
61        let mut inner = self.inner.lock().unwrap();
62        *inner += 1;
63    }
64}
65
66/// Iterator over batches
67#[derive(Debug, Default)]
68pub struct TestStream {
69    /// Vector of record batches
70    data: Vec<RecordBatch>,
71    /// Index into the data that has been returned so far
72    index: BatchIndex,
73}
74
75impl TestStream {
76    /// Create an iterator for a vector of record batches. Assumes at
77    /// least one entry in data (for the schema)
78    pub fn new(data: Vec<RecordBatch>) -> Self {
79        Self {
80            data,
81            ..Default::default()
82        }
83    }
84
85    /// Return a handle to the index counter for this stream
86    pub fn index(&self) -> BatchIndex {
87        self.index.clone()
88    }
89}
90
91impl Stream for TestStream {
92    type Item = Result<RecordBatch>;
93
94    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95        let next_batch = self.index.value();
96
97        Poll::Ready(if next_batch < self.data.len() {
98            let next_batch = self.index.value();
99            self.index.incr();
100            Some(Ok(self.data[next_batch].clone()))
101        } else {
102            None
103        })
104    }
105
106    fn size_hint(&self) -> (usize, Option<usize>) {
107        (self.data.len(), Some(self.data.len()))
108    }
109}
110
111impl RecordBatchStream for TestStream {
112    /// Get the schema
113    fn schema(&self) -> SchemaRef {
114        self.data[0].schema()
115    }
116}
117
118/// A Mock ExecutionPlan that can be used for writing tests of other
119/// ExecutionPlans
120#[derive(Debug)]
121pub struct MockExec {
122    /// the results to send back
123    data: Vec<Result<RecordBatch>>,
124    schema: SchemaRef,
125    /// if true (the default), sends data using a separate task to ensure the
126    /// batches are not available without this stream yielding first
127    use_task: bool,
128    cache: Arc<PlanProperties>,
129}
130
131impl MockExec {
132    /// Create a new `MockExec` with a single partition that returns
133    /// the specified `Results`s.
134    ///
135    /// By default, the batches are not produced immediately (the
136    /// caller has to actually yield and another task must run) to
137    /// ensure any poll loops are correct. This behavior can be
138    /// changed with `with_use_task`
139    pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
140        let cache = Self::compute_properties(Arc::clone(&schema));
141        Self {
142            data,
143            schema,
144            use_task: true,
145            cache: Arc::new(cache),
146        }
147    }
148
149    /// If `use_task` is true (the default) then the batches are sent
150    /// back using a separate task to ensure the underlying stream is
151    /// not immediately ready
152    pub fn with_use_task(mut self, use_task: bool) -> Self {
153        self.use_task = use_task;
154        self
155    }
156
157    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
158    fn compute_properties(schema: SchemaRef) -> PlanProperties {
159        PlanProperties::new(
160            EquivalenceProperties::new(schema),
161            Partitioning::UnknownPartitioning(1),
162            EmissionType::Incremental,
163            Boundedness::Bounded,
164        )
165    }
166}
167
168impl DisplayAs for MockExec {
169    fn fmt_as(
170        &self,
171        t: DisplayFormatType,
172        f: &mut std::fmt::Formatter,
173    ) -> std::fmt::Result {
174        match t {
175            DisplayFormatType::Default | DisplayFormatType::Verbose => {
176                write!(f, "MockExec")
177            }
178            DisplayFormatType::TreeRender => {
179                // TODO: collect info
180                write!(f, "")
181            }
182        }
183    }
184}
185
186impl ExecutionPlan for MockExec {
187    fn name(&self) -> &'static str {
188        Self::static_name()
189    }
190
191    fn as_any(&self) -> &dyn Any {
192        self
193    }
194
195    fn properties(&self) -> &Arc<PlanProperties> {
196        &self.cache
197    }
198
199    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200        vec![]
201    }
202
203    fn with_new_children(
204        self: Arc<Self>,
205        _: Vec<Arc<dyn ExecutionPlan>>,
206    ) -> Result<Arc<dyn ExecutionPlan>> {
207        unimplemented!()
208    }
209
210    /// Returns a stream which yields data
211    fn execute(
212        &self,
213        partition: usize,
214        _context: Arc<TaskContext>,
215    ) -> Result<SendableRecordBatchStream> {
216        assert_eq!(partition, 0);
217
218        // Result doesn't implement clone, so do it ourself
219        let data: Vec<_> = self
220            .data
221            .iter()
222            .map(|r| match r {
223                Ok(batch) => Ok(batch.clone()),
224                Err(e) => Err(clone_error(e)),
225            })
226            .collect();
227
228        if self.use_task {
229            let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
230            // send data in order but in a separate task (to ensure
231            // the batches are not available without the stream
232            // yielding).
233            let tx = builder.tx();
234            builder.spawn(async move {
235                for batch in data {
236                    println!("Sending batch via delayed stream");
237                    if let Err(e) = tx.send(batch).await {
238                        println!("ERROR batch via delayed stream: {e}");
239                    }
240                }
241
242                Ok(())
243            });
244            // returned stream simply reads off the rx stream
245            Ok(builder.build())
246        } else {
247            // make an input that will error
248            let stream = futures::stream::iter(data);
249            Ok(Box::pin(RecordBatchStreamAdapter::new(
250                self.schema(),
251                stream,
252            )))
253        }
254    }
255
256    // Panics if one of the batches is an error
257    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
258        if partition.is_some() {
259            return Ok(Statistics::new_unknown(&self.schema));
260        }
261        let data: Result<Vec<_>> = self
262            .data
263            .iter()
264            .map(|r| match r {
265                Ok(batch) => Ok(batch.clone()),
266                Err(e) => Err(clone_error(e)),
267            })
268            .collect();
269
270        let data = data?;
271
272        Ok(common::compute_record_batch_statistics(
273            &[data],
274            &self.schema,
275            None,
276        ))
277    }
278}
279
280fn clone_error(e: &DataFusionError) -> DataFusionError {
281    use DataFusionError::*;
282    match e {
283        Execution(msg) => Execution(msg.to_string()),
284        _ => unimplemented!(),
285    }
286}
287
288/// A Mock ExecutionPlan that does not start producing input until a
289/// barrier is called
290#[derive(Debug)]
291pub struct BarrierExec {
292    /// partitions to send back
293    data: Vec<Vec<RecordBatch>>,
294    schema: SchemaRef,
295
296    /// all streams wait on this barrier to produce
297    start_data_barrier: Option<Arc<Barrier>>,
298
299    /// the stream wait for this to return Poll::Ready(None)
300    finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
301
302    cache: Arc<PlanProperties>,
303
304    log: bool,
305}
306
307impl BarrierExec {
308    /// Create a new exec with some number of partitions.
309    pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
310        // wait for all streams and the input
311        let barrier = Some(Arc::new(Barrier::new(data.len() + 1)));
312        let cache = Self::compute_properties(Arc::clone(&schema), &data);
313        Self {
314            data,
315            schema,
316            start_data_barrier: barrier,
317            cache: Arc::new(cache),
318            finish_barrier: None,
319            log: true,
320        }
321    }
322
323    pub fn with_log(mut self, log: bool) -> Self {
324        self.log = log;
325        self
326    }
327
328    pub fn without_start_barrier(mut self) -> Self {
329        self.start_data_barrier = None;
330        self
331    }
332
333    pub fn with_finish_barrier(mut self) -> Self {
334        let barrier = Arc::new((
335            // wait for all streams and the input
336            Barrier::new(self.data.len() + 1),
337            AtomicUsize::new(0),
338        ));
339
340        self.finish_barrier = Some(barrier);
341        self
342    }
343
344    /// wait until all the input streams and this function is ready
345    pub async fn wait(&self) {
346        let barrier = &self
347            .start_data_barrier
348            .as_ref()
349            .expect("Must only be called when having a start barrier");
350        if self.log {
351            println!("BarrierExec::wait waiting on barrier");
352        }
353        barrier.wait().await;
354        if self.log {
355            println!("BarrierExec::wait done waiting");
356        }
357    }
358
359    pub async fn wait_finish(&self) {
360        let (barrier, _) = &self
361            .finish_barrier
362            .as_deref()
363            .expect("Must only be called when having a finish barrier");
364
365        if self.log {
366            println!("BarrierExec::wait_finish waiting on barrier");
367        }
368        barrier.wait().await;
369        if self.log {
370            println!("BarrierExec::wait_finish done waiting");
371        }
372    }
373
374    /// Return true if the finish barrier has been reached in all partitions
375    pub fn is_finish_barrier_reached(&self) -> bool {
376        let (_, reached_finish) = self
377            .finish_barrier
378            .as_deref()
379            .expect("Must only be called when having finish barrier");
380
381        reached_finish.load(Ordering::Relaxed) == self.data.len()
382    }
383
384    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
385    fn compute_properties(
386        schema: SchemaRef,
387        data: &[Vec<RecordBatch>],
388    ) -> PlanProperties {
389        PlanProperties::new(
390            EquivalenceProperties::new(schema),
391            Partitioning::UnknownPartitioning(data.len()),
392            EmissionType::Incremental,
393            Boundedness::Bounded,
394        )
395    }
396}
397
398impl DisplayAs for BarrierExec {
399    fn fmt_as(
400        &self,
401        t: DisplayFormatType,
402        f: &mut std::fmt::Formatter,
403    ) -> std::fmt::Result {
404        match t {
405            DisplayFormatType::Default | DisplayFormatType::Verbose => {
406                write!(f, "BarrierExec")
407            }
408            DisplayFormatType::TreeRender => {
409                // TODO: collect info
410                write!(f, "")
411            }
412        }
413    }
414}
415
416impl ExecutionPlan for BarrierExec {
417    fn name(&self) -> &'static str {
418        Self::static_name()
419    }
420
421    fn as_any(&self) -> &dyn Any {
422        self
423    }
424
425    fn properties(&self) -> &Arc<PlanProperties> {
426        &self.cache
427    }
428
429    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
430        unimplemented!()
431    }
432
433    fn with_new_children(
434        self: Arc<Self>,
435        _: Vec<Arc<dyn ExecutionPlan>>,
436    ) -> Result<Arc<dyn ExecutionPlan>> {
437        unimplemented!()
438    }
439
440    /// Returns a stream which yields data
441    fn execute(
442        &self,
443        partition: usize,
444        _context: Arc<TaskContext>,
445    ) -> Result<SendableRecordBatchStream> {
446        assert!(partition < self.data.len());
447
448        let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
449
450        // task simply sends data in order after barrier is reached
451        let data = self.data[partition].clone();
452        let start_barrier = self.start_data_barrier.as_ref().map(Arc::clone);
453        let finish_barrier = self.finish_barrier.as_ref().map(Arc::clone);
454        let log = self.log;
455        let tx = builder.tx();
456        builder.spawn(async move {
457            if let Some(barrier) = start_barrier {
458                if log {
459                    println!("Partition {partition} waiting on barrier");
460                }
461                barrier.wait().await;
462            }
463            for batch in data {
464                if log {
465                    println!("Partition {partition} sending batch");
466                }
467                if let Err(e) = tx.send(Ok(batch)).await {
468                    println!("ERROR batch via barrier stream stream: {e}");
469                }
470            }
471            if let Some((barrier, reached_finish)) = finish_barrier.as_deref() {
472                if log {
473                    println!("Partition {partition} waiting on finish barrier");
474                }
475                reached_finish.fetch_add(1, Ordering::Relaxed);
476                barrier.wait().await;
477            }
478
479            Ok(())
480        });
481
482        // returned stream simply reads off the rx stream
483        Ok(builder.build())
484    }
485
486    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
487        if partition.is_some() {
488            return Ok(Statistics::new_unknown(&self.schema));
489        }
490        Ok(common::compute_record_batch_statistics(
491            &self.data,
492            &self.schema,
493            None,
494        ))
495    }
496}
497
498/// A mock execution plan that errors on a call to execute
499#[derive(Debug)]
500pub struct ErrorExec {
501    cache: Arc<PlanProperties>,
502}
503
504impl Default for ErrorExec {
505    fn default() -> Self {
506        Self::new()
507    }
508}
509
510impl ErrorExec {
511    pub fn new() -> Self {
512        let schema = Arc::new(Schema::new(vec![Field::new(
513            "dummy",
514            DataType::Int64,
515            true,
516        )]));
517        let cache = Self::compute_properties(schema);
518        Self {
519            cache: Arc::new(cache),
520        }
521    }
522
523    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
524    fn compute_properties(schema: SchemaRef) -> PlanProperties {
525        PlanProperties::new(
526            EquivalenceProperties::new(schema),
527            Partitioning::UnknownPartitioning(1),
528            EmissionType::Incremental,
529            Boundedness::Bounded,
530        )
531    }
532}
533
534impl DisplayAs for ErrorExec {
535    fn fmt_as(
536        &self,
537        t: DisplayFormatType,
538        f: &mut std::fmt::Formatter,
539    ) -> std::fmt::Result {
540        match t {
541            DisplayFormatType::Default | DisplayFormatType::Verbose => {
542                write!(f, "ErrorExec")
543            }
544            DisplayFormatType::TreeRender => {
545                // TODO: collect info
546                write!(f, "")
547            }
548        }
549    }
550}
551
552impl ExecutionPlan for ErrorExec {
553    fn name(&self) -> &'static str {
554        Self::static_name()
555    }
556
557    fn as_any(&self) -> &dyn Any {
558        self
559    }
560
561    fn properties(&self) -> &Arc<PlanProperties> {
562        &self.cache
563    }
564
565    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
566        unimplemented!()
567    }
568
569    fn with_new_children(
570        self: Arc<Self>,
571        _: Vec<Arc<dyn ExecutionPlan>>,
572    ) -> Result<Arc<dyn ExecutionPlan>> {
573        unimplemented!()
574    }
575
576    /// Returns a stream which yields data
577    fn execute(
578        &self,
579        partition: usize,
580        _context: Arc<TaskContext>,
581    ) -> Result<SendableRecordBatchStream> {
582        internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
583    }
584}
585
586/// A mock execution plan that simply returns the provided statistics
587#[derive(Debug, Clone)]
588pub struct StatisticsExec {
589    stats: Statistics,
590    schema: Arc<Schema>,
591    cache: Arc<PlanProperties>,
592}
593impl StatisticsExec {
594    pub fn new(stats: Statistics, schema: Schema) -> Self {
595        assert_eq!(
596            stats.column_statistics.len(),
597            schema.fields().len(),
598            "if defined, the column statistics vector length should be the number of fields"
599        );
600        let cache = Self::compute_properties(Arc::new(schema.clone()));
601        Self {
602            stats,
603            schema: Arc::new(schema),
604            cache: Arc::new(cache),
605        }
606    }
607
608    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
609    fn compute_properties(schema: SchemaRef) -> PlanProperties {
610        PlanProperties::new(
611            EquivalenceProperties::new(schema),
612            Partitioning::UnknownPartitioning(2),
613            EmissionType::Incremental,
614            Boundedness::Bounded,
615        )
616    }
617}
618
619impl DisplayAs for StatisticsExec {
620    fn fmt_as(
621        &self,
622        t: DisplayFormatType,
623        f: &mut std::fmt::Formatter,
624    ) -> std::fmt::Result {
625        match t {
626            DisplayFormatType::Default | DisplayFormatType::Verbose => {
627                write!(
628                    f,
629                    "StatisticsExec: col_count={}, row_count={:?}",
630                    self.schema.fields().len(),
631                    self.stats.num_rows,
632                )
633            }
634            DisplayFormatType::TreeRender => {
635                // TODO: collect info
636                write!(f, "")
637            }
638        }
639    }
640}
641
642impl ExecutionPlan for StatisticsExec {
643    fn name(&self) -> &'static str {
644        Self::static_name()
645    }
646
647    fn as_any(&self) -> &dyn Any {
648        self
649    }
650
651    fn properties(&self) -> &Arc<PlanProperties> {
652        &self.cache
653    }
654
655    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
656        vec![]
657    }
658
659    fn with_new_children(
660        self: Arc<Self>,
661        _: Vec<Arc<dyn ExecutionPlan>>,
662    ) -> Result<Arc<dyn ExecutionPlan>> {
663        Ok(self)
664    }
665
666    fn execute(
667        &self,
668        _partition: usize,
669        _context: Arc<TaskContext>,
670    ) -> Result<SendableRecordBatchStream> {
671        unimplemented!("This plan only serves for testing statistics")
672    }
673
674    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
675        Ok(if partition.is_some() {
676            Statistics::new_unknown(&self.schema)
677        } else {
678            self.stats.clone()
679        })
680    }
681}
682
683/// Execution plan that emits streams that block forever.
684///
685/// This is useful to test shutdown / cancellation behavior of certain execution plans.
686#[derive(Debug)]
687pub struct BlockingExec {
688    /// Schema that is mocked by this plan.
689    schema: SchemaRef,
690
691    /// Ref-counting helper to check if the plan and the produced stream are still in memory.
692    refs: Arc<()>,
693    cache: Arc<PlanProperties>,
694}
695
696impl BlockingExec {
697    /// Create new [`BlockingExec`] with a give schema and number of partitions.
698    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
699        let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
700        Self {
701            schema,
702            refs: Default::default(),
703            cache: Arc::new(cache),
704        }
705    }
706
707    /// Weak pointer that can be used for ref-counting this execution plan and its streams.
708    ///
709    /// Use [`Weak::strong_count`] to determine if the plan itself and its streams are dropped (should be 0 in that
710    /// case). Note that tokio might take some time to cancel spawned tasks, so you need to wrap this check into a retry
711    /// loop. Use [`assert_strong_count_converges_to_zero`] to archive this.
712    pub fn refs(&self) -> Weak<()> {
713        Arc::downgrade(&self.refs)
714    }
715
716    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
717    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
718        PlanProperties::new(
719            EquivalenceProperties::new(schema),
720            Partitioning::UnknownPartitioning(n_partitions),
721            EmissionType::Incremental,
722            Boundedness::Bounded,
723        )
724    }
725}
726
727impl DisplayAs for BlockingExec {
728    fn fmt_as(
729        &self,
730        t: DisplayFormatType,
731        f: &mut std::fmt::Formatter,
732    ) -> std::fmt::Result {
733        match t {
734            DisplayFormatType::Default | DisplayFormatType::Verbose => {
735                write!(f, "BlockingExec",)
736            }
737            DisplayFormatType::TreeRender => {
738                // TODO: collect info
739                write!(f, "")
740            }
741        }
742    }
743}
744
745impl ExecutionPlan for BlockingExec {
746    fn name(&self) -> &'static str {
747        Self::static_name()
748    }
749
750    fn as_any(&self) -> &dyn Any {
751        self
752    }
753
754    fn properties(&self) -> &Arc<PlanProperties> {
755        &self.cache
756    }
757
758    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
759        // this is a leaf node and has no children
760        vec![]
761    }
762
763    fn with_new_children(
764        self: Arc<Self>,
765        _: Vec<Arc<dyn ExecutionPlan>>,
766    ) -> Result<Arc<dyn ExecutionPlan>> {
767        internal_err!("Children cannot be replaced in {self:?}")
768    }
769
770    fn execute(
771        &self,
772        _partition: usize,
773        _context: Arc<TaskContext>,
774    ) -> Result<SendableRecordBatchStream> {
775        Ok(Box::pin(BlockingStream {
776            schema: Arc::clone(&self.schema),
777            _refs: Arc::clone(&self.refs),
778        }))
779    }
780}
781
782/// A [`RecordBatchStream`] that is pending forever.
783#[derive(Debug)]
784pub struct BlockingStream {
785    /// Schema mocked by this stream.
786    schema: SchemaRef,
787
788    /// Ref-counting helper to check if the stream are still in memory.
789    _refs: Arc<()>,
790}
791
792impl Stream for BlockingStream {
793    type Item = Result<RecordBatch>;
794
795    fn poll_next(
796        self: Pin<&mut Self>,
797        _cx: &mut Context<'_>,
798    ) -> Poll<Option<Self::Item>> {
799        Poll::Pending
800    }
801}
802
803impl RecordBatchStream for BlockingStream {
804    fn schema(&self) -> SchemaRef {
805        Arc::clone(&self.schema)
806    }
807}
808
809/// Asserts that the strong count of the given [`Weak`] pointer converges to zero.
810///
811/// This might take a while but has a timeout.
812pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
813    tokio::time::timeout(std::time::Duration::from_secs(10), async {
814        loop {
815            if dbg!(Weak::strong_count(&refs)) == 0 {
816                break;
817            }
818            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
819        }
820    })
821    .await
822    .unwrap();
823}
824
825/// Execution plan that emits streams that panics.
826///
827/// This is useful to test panic handling of certain execution plans.
828#[derive(Debug)]
829pub struct PanicExec {
830    /// Schema that is mocked by this plan.
831    schema: SchemaRef,
832
833    /// Number of output partitions. Each partition will produce this
834    /// many empty output record batches prior to panicking
835    batches_until_panics: Vec<usize>,
836    cache: Arc<PlanProperties>,
837}
838
839impl PanicExec {
840    /// Create new [`PanicExec`] with a give schema and number of
841    /// partitions, which will each panic immediately.
842    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
843        let batches_until_panics = vec![0; n_partitions];
844        let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
845        Self {
846            schema,
847            batches_until_panics,
848            cache: Arc::new(cache),
849        }
850    }
851
852    /// Set the number of batches prior to panic for a partition
853    pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
854        self.batches_until_panics[partition] = count;
855        self
856    }
857
858    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
859    fn compute_properties(
860        schema: SchemaRef,
861        batches_until_panics: &[usize],
862    ) -> PlanProperties {
863        let num_partitions = batches_until_panics.len();
864        PlanProperties::new(
865            EquivalenceProperties::new(schema),
866            Partitioning::UnknownPartitioning(num_partitions),
867            EmissionType::Incremental,
868            Boundedness::Bounded,
869        )
870    }
871}
872
873impl DisplayAs for PanicExec {
874    fn fmt_as(
875        &self,
876        t: DisplayFormatType,
877        f: &mut std::fmt::Formatter,
878    ) -> std::fmt::Result {
879        match t {
880            DisplayFormatType::Default | DisplayFormatType::Verbose => {
881                write!(f, "PanicExec",)
882            }
883            DisplayFormatType::TreeRender => {
884                // TODO: collect info
885                write!(f, "")
886            }
887        }
888    }
889}
890
891impl ExecutionPlan for PanicExec {
892    fn name(&self) -> &'static str {
893        Self::static_name()
894    }
895
896    fn as_any(&self) -> &dyn Any {
897        self
898    }
899
900    fn properties(&self) -> &Arc<PlanProperties> {
901        &self.cache
902    }
903
904    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
905        // this is a leaf node and has no children
906        vec![]
907    }
908
909    fn with_new_children(
910        self: Arc<Self>,
911        _: Vec<Arc<dyn ExecutionPlan>>,
912    ) -> Result<Arc<dyn ExecutionPlan>> {
913        internal_err!("Children cannot be replaced in {:?}", self)
914    }
915
916    fn execute(
917        &self,
918        partition: usize,
919        _context: Arc<TaskContext>,
920    ) -> Result<SendableRecordBatchStream> {
921        Ok(Box::pin(PanicStream {
922            partition,
923            batches_until_panic: self.batches_until_panics[partition],
924            schema: Arc::clone(&self.schema),
925            ready: false,
926        }))
927    }
928}
929
930/// A [`RecordBatchStream`] that yields every other batch and panics
931/// after `batches_until_panic` batches have been produced.
932///
933/// Useful for testing the behavior of streams on panic
934#[derive(Debug)]
935struct PanicStream {
936    /// Which partition was this
937    partition: usize,
938    /// How may batches will be produced until panic
939    batches_until_panic: usize,
940    /// Schema mocked by this stream.
941    schema: SchemaRef,
942    /// Should we return ready ?
943    ready: bool,
944}
945
946impl Stream for PanicStream {
947    type Item = Result<RecordBatch>;
948
949    fn poll_next(
950        mut self: Pin<&mut Self>,
951        cx: &mut Context<'_>,
952    ) -> Poll<Option<Self::Item>> {
953        if self.batches_until_panic > 0 {
954            if self.ready {
955                self.batches_until_panic -= 1;
956                self.ready = false;
957                let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
958                return Poll::Ready(Some(Ok(batch)));
959            } else {
960                self.ready = true;
961                // get called again
962                cx.waker().wake_by_ref();
963                return Poll::Pending;
964            }
965        }
966        panic!("PanickingStream did panic: {}", self.partition)
967    }
968}
969
970impl RecordBatchStream for PanicStream {
971    fn schema(&self) -> SchemaRef {
972        Arc::clone(&self.schema)
973    }
974}