Skip to main content

datafusion_physical_plan/test/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Simple iterator over batches for use in testing
19
20use crate::{
21    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
22    RecordBatchStream, SendableRecordBatchStream, Statistics, common,
23    execution_plan::Boundedness,
24};
25use crate::{
26    execution_plan::EmissionType,
27    stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
28};
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::{
31    any::Any,
32    pin::Pin,
33    sync::{Arc, Weak},
34    task::{Context, Poll},
35};
36
37use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38use arrow::record_batch::RecordBatch;
39use datafusion_common::{DataFusionError, Result, internal_err};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::EquivalenceProperties;
42
43use futures::Stream;
44use tokio::sync::Barrier;
45
46/// Index into the data that has been returned so far
47#[derive(Debug, Default, Clone)]
48pub struct BatchIndex {
49    inner: Arc<std::sync::Mutex<usize>>,
50}
51
52impl BatchIndex {
53    /// Return the current index
54    pub fn value(&self) -> usize {
55        let inner = self.inner.lock().unwrap();
56        *inner
57    }
58
59    // increment the current index by one
60    pub fn incr(&self) {
61        let mut inner = self.inner.lock().unwrap();
62        *inner += 1;
63    }
64}
65
66/// Iterator over batches
67#[derive(Debug, Default)]
68pub struct TestStream {
69    /// Vector of record batches
70    data: Vec<RecordBatch>,
71    /// Index into the data that has been returned so far
72    index: BatchIndex,
73}
74
75impl TestStream {
76    /// Create an iterator for a vector of record batches. Assumes at
77    /// least one entry in data (for the schema)
78    pub fn new(data: Vec<RecordBatch>) -> Self {
79        Self {
80            data,
81            ..Default::default()
82        }
83    }
84
85    /// Return a handle to the index counter for this stream
86    pub fn index(&self) -> BatchIndex {
87        self.index.clone()
88    }
89}
90
91impl Stream for TestStream {
92    type Item = Result<RecordBatch>;
93
94    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95        let next_batch = self.index.value();
96
97        Poll::Ready(if next_batch < self.data.len() {
98            let next_batch = self.index.value();
99            self.index.incr();
100            Some(Ok(self.data[next_batch].clone()))
101        } else {
102            None
103        })
104    }
105
106    fn size_hint(&self) -> (usize, Option<usize>) {
107        (self.data.len(), Some(self.data.len()))
108    }
109}
110
111impl RecordBatchStream for TestStream {
112    /// Get the schema
113    fn schema(&self) -> SchemaRef {
114        self.data[0].schema()
115    }
116}
117
118/// A Mock ExecutionPlan that can be used for writing tests of other
119/// ExecutionPlans
120#[derive(Debug)]
121pub struct MockExec {
122    /// the results to send back
123    data: Vec<Result<RecordBatch>>,
124    schema: SchemaRef,
125    /// if true (the default), sends data using a separate task to ensure the
126    /// batches are not available without this stream yielding first
127    use_task: bool,
128    cache: PlanProperties,
129}
130
131impl MockExec {
132    /// Create a new `MockExec` with a single partition that returns
133    /// the specified `Results`s.
134    ///
135    /// By default, the batches are not produced immediately (the
136    /// caller has to actually yield and another task must run) to
137    /// ensure any poll loops are correct. This behavior can be
138    /// changed with `with_use_task`
139    pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
140        let cache = Self::compute_properties(Arc::clone(&schema));
141        Self {
142            data,
143            schema,
144            use_task: true,
145            cache,
146        }
147    }
148
149    /// If `use_task` is true (the default) then the batches are sent
150    /// back using a separate task to ensure the underlying stream is
151    /// not immediately ready
152    pub fn with_use_task(mut self, use_task: bool) -> Self {
153        self.use_task = use_task;
154        self
155    }
156
157    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
158    fn compute_properties(schema: SchemaRef) -> PlanProperties {
159        PlanProperties::new(
160            EquivalenceProperties::new(schema),
161            Partitioning::UnknownPartitioning(1),
162            EmissionType::Incremental,
163            Boundedness::Bounded,
164        )
165    }
166}
167
168impl DisplayAs for MockExec {
169    fn fmt_as(
170        &self,
171        t: DisplayFormatType,
172        f: &mut std::fmt::Formatter,
173    ) -> std::fmt::Result {
174        match t {
175            DisplayFormatType::Default | DisplayFormatType::Verbose => {
176                write!(f, "MockExec")
177            }
178            DisplayFormatType::TreeRender => {
179                // TODO: collect info
180                write!(f, "")
181            }
182        }
183    }
184}
185
186impl ExecutionPlan for MockExec {
187    fn name(&self) -> &'static str {
188        Self::static_name()
189    }
190
191    fn as_any(&self) -> &dyn Any {
192        self
193    }
194
195    fn properties(&self) -> &PlanProperties {
196        &self.cache
197    }
198
199    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200        vec![]
201    }
202
203    fn with_new_children(
204        self: Arc<Self>,
205        _: Vec<Arc<dyn ExecutionPlan>>,
206    ) -> Result<Arc<dyn ExecutionPlan>> {
207        unimplemented!()
208    }
209
210    /// Returns a stream which yields data
211    fn execute(
212        &self,
213        partition: usize,
214        _context: Arc<TaskContext>,
215    ) -> Result<SendableRecordBatchStream> {
216        assert_eq!(partition, 0);
217
218        // Result doesn't implement clone, so do it ourself
219        let data: Vec<_> = self
220            .data
221            .iter()
222            .map(|r| match r {
223                Ok(batch) => Ok(batch.clone()),
224                Err(e) => Err(clone_error(e)),
225            })
226            .collect();
227
228        if self.use_task {
229            let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
230            // send data in order but in a separate task (to ensure
231            // the batches are not available without the stream
232            // yielding).
233            let tx = builder.tx();
234            builder.spawn(async move {
235                for batch in data {
236                    println!("Sending batch via delayed stream");
237                    if let Err(e) = tx.send(batch).await {
238                        println!("ERROR batch via delayed stream: {e}");
239                    }
240                }
241
242                Ok(())
243            });
244            // returned stream simply reads off the rx stream
245            Ok(builder.build())
246        } else {
247            // make an input that will error
248            let stream = futures::stream::iter(data);
249            Ok(Box::pin(RecordBatchStreamAdapter::new(
250                self.schema(),
251                stream,
252            )))
253        }
254    }
255
256    // Panics if one of the batches is an error
257    fn statistics(&self) -> Result<Statistics> {
258        self.partition_statistics(None)
259    }
260
261    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
262        if partition.is_some() {
263            return Ok(Statistics::new_unknown(&self.schema));
264        }
265        let data: Result<Vec<_>> = self
266            .data
267            .iter()
268            .map(|r| match r {
269                Ok(batch) => Ok(batch.clone()),
270                Err(e) => Err(clone_error(e)),
271            })
272            .collect();
273
274        let data = data?;
275
276        Ok(common::compute_record_batch_statistics(
277            &[data],
278            &self.schema,
279            None,
280        ))
281    }
282}
283
284fn clone_error(e: &DataFusionError) -> DataFusionError {
285    use DataFusionError::*;
286    match e {
287        Execution(msg) => Execution(msg.to_string()),
288        _ => unimplemented!(),
289    }
290}
291
292/// A Mock ExecutionPlan that does not start producing input until a
293/// barrier is called
294#[derive(Debug)]
295pub struct BarrierExec {
296    /// partitions to send back
297    data: Vec<Vec<RecordBatch>>,
298    schema: SchemaRef,
299
300    /// all streams wait on this barrier to produce
301    start_data_barrier: Option<Arc<Barrier>>,
302
303    /// the stream wait for this to return Poll::Ready(None)
304    finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
305
306    cache: PlanProperties,
307
308    log: bool,
309}
310
311impl BarrierExec {
312    /// Create a new exec with some number of partitions.
313    pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
314        // wait for all streams and the input
315        let barrier = Some(Arc::new(Barrier::new(data.len() + 1)));
316        let cache = Self::compute_properties(Arc::clone(&schema), &data);
317        Self {
318            data,
319            schema,
320            start_data_barrier: barrier,
321            cache,
322            finish_barrier: None,
323            log: true,
324        }
325    }
326
327    pub fn with_log(mut self, log: bool) -> Self {
328        self.log = log;
329        self
330    }
331
332    pub fn without_start_barrier(mut self) -> Self {
333        self.start_data_barrier = None;
334        self
335    }
336
337    pub fn with_finish_barrier(mut self) -> Self {
338        let barrier = Arc::new((
339            // wait for all streams and the input
340            Barrier::new(self.data.len() + 1),
341            AtomicUsize::new(0),
342        ));
343
344        self.finish_barrier = Some(barrier);
345        self
346    }
347
348    /// wait until all the input streams and this function is ready
349    pub async fn wait(&self) {
350        let barrier = &self
351            .start_data_barrier
352            .as_ref()
353            .expect("Must only be called when having a start barrier");
354        if self.log {
355            println!("BarrierExec::wait waiting on barrier");
356        }
357        barrier.wait().await;
358        if self.log {
359            println!("BarrierExec::wait done waiting");
360        }
361    }
362
363    pub async fn wait_finish(&self) {
364        let (barrier, _) = &self
365            .finish_barrier
366            .as_deref()
367            .expect("Must only be called when having a finish barrier");
368
369        if self.log {
370            println!("BarrierExec::wait_finish waiting on barrier");
371        }
372        barrier.wait().await;
373        if self.log {
374            println!("BarrierExec::wait_finish done waiting");
375        }
376    }
377
378    /// Return true if the finish barrier has been reached in all partitions
379    pub fn is_finish_barrier_reached(&self) -> bool {
380        let (_, reached_finish) = self
381            .finish_barrier
382            .as_deref()
383            .expect("Must only be called when having finish barrier");
384
385        reached_finish.load(Ordering::Relaxed) == self.data.len()
386    }
387
388    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
389    fn compute_properties(
390        schema: SchemaRef,
391        data: &[Vec<RecordBatch>],
392    ) -> PlanProperties {
393        PlanProperties::new(
394            EquivalenceProperties::new(schema),
395            Partitioning::UnknownPartitioning(data.len()),
396            EmissionType::Incremental,
397            Boundedness::Bounded,
398        )
399    }
400}
401
402impl DisplayAs for BarrierExec {
403    fn fmt_as(
404        &self,
405        t: DisplayFormatType,
406        f: &mut std::fmt::Formatter,
407    ) -> std::fmt::Result {
408        match t {
409            DisplayFormatType::Default | DisplayFormatType::Verbose => {
410                write!(f, "BarrierExec")
411            }
412            DisplayFormatType::TreeRender => {
413                // TODO: collect info
414                write!(f, "")
415            }
416        }
417    }
418}
419
420impl ExecutionPlan for BarrierExec {
421    fn name(&self) -> &'static str {
422        Self::static_name()
423    }
424
425    fn as_any(&self) -> &dyn Any {
426        self
427    }
428
429    fn properties(&self) -> &PlanProperties {
430        &self.cache
431    }
432
433    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
434        unimplemented!()
435    }
436
437    fn with_new_children(
438        self: Arc<Self>,
439        _: Vec<Arc<dyn ExecutionPlan>>,
440    ) -> Result<Arc<dyn ExecutionPlan>> {
441        unimplemented!()
442    }
443
444    /// Returns a stream which yields data
445    fn execute(
446        &self,
447        partition: usize,
448        _context: Arc<TaskContext>,
449    ) -> Result<SendableRecordBatchStream> {
450        assert!(partition < self.data.len());
451
452        let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
453
454        // task simply sends data in order after barrier is reached
455        let data = self.data[partition].clone();
456        let start_barrier = self.start_data_barrier.as_ref().map(Arc::clone);
457        let finish_barrier = self.finish_barrier.as_ref().map(Arc::clone);
458        let log = self.log;
459        let tx = builder.tx();
460        builder.spawn(async move {
461            if let Some(barrier) = start_barrier {
462                if log {
463                    println!("Partition {partition} waiting on barrier");
464                }
465                barrier.wait().await;
466            }
467            for batch in data {
468                if log {
469                    println!("Partition {partition} sending batch");
470                }
471                if let Err(e) = tx.send(Ok(batch)).await {
472                    println!("ERROR batch via barrier stream stream: {e}");
473                }
474            }
475            if let Some((barrier, reached_finish)) = finish_barrier.as_deref() {
476                if log {
477                    println!("Partition {partition} waiting on finish barrier");
478                }
479                reached_finish.fetch_add(1, Ordering::Relaxed);
480                barrier.wait().await;
481            }
482
483            Ok(())
484        });
485
486        // returned stream simply reads off the rx stream
487        Ok(builder.build())
488    }
489
490    fn statistics(&self) -> Result<Statistics> {
491        self.partition_statistics(None)
492    }
493
494    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
495        if partition.is_some() {
496            return Ok(Statistics::new_unknown(&self.schema));
497        }
498        Ok(common::compute_record_batch_statistics(
499            &self.data,
500            &self.schema,
501            None,
502        ))
503    }
504}
505
506/// A mock execution plan that errors on a call to execute
507#[derive(Debug)]
508pub struct ErrorExec {
509    cache: PlanProperties,
510}
511
512impl Default for ErrorExec {
513    fn default() -> Self {
514        Self::new()
515    }
516}
517
518impl ErrorExec {
519    pub fn new() -> Self {
520        let schema = Arc::new(Schema::new(vec![Field::new(
521            "dummy",
522            DataType::Int64,
523            true,
524        )]));
525        let cache = Self::compute_properties(schema);
526        Self { cache }
527    }
528
529    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
530    fn compute_properties(schema: SchemaRef) -> PlanProperties {
531        PlanProperties::new(
532            EquivalenceProperties::new(schema),
533            Partitioning::UnknownPartitioning(1),
534            EmissionType::Incremental,
535            Boundedness::Bounded,
536        )
537    }
538}
539
540impl DisplayAs for ErrorExec {
541    fn fmt_as(
542        &self,
543        t: DisplayFormatType,
544        f: &mut std::fmt::Formatter,
545    ) -> std::fmt::Result {
546        match t {
547            DisplayFormatType::Default | DisplayFormatType::Verbose => {
548                write!(f, "ErrorExec")
549            }
550            DisplayFormatType::TreeRender => {
551                // TODO: collect info
552                write!(f, "")
553            }
554        }
555    }
556}
557
558impl ExecutionPlan for ErrorExec {
559    fn name(&self) -> &'static str {
560        Self::static_name()
561    }
562
563    fn as_any(&self) -> &dyn Any {
564        self
565    }
566
567    fn properties(&self) -> &PlanProperties {
568        &self.cache
569    }
570
571    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
572        unimplemented!()
573    }
574
575    fn with_new_children(
576        self: Arc<Self>,
577        _: Vec<Arc<dyn ExecutionPlan>>,
578    ) -> Result<Arc<dyn ExecutionPlan>> {
579        unimplemented!()
580    }
581
582    /// Returns a stream which yields data
583    fn execute(
584        &self,
585        partition: usize,
586        _context: Arc<TaskContext>,
587    ) -> Result<SendableRecordBatchStream> {
588        internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
589    }
590}
591
592/// A mock execution plan that simply returns the provided statistics
593#[derive(Debug, Clone)]
594pub struct StatisticsExec {
595    stats: Statistics,
596    schema: Arc<Schema>,
597    cache: PlanProperties,
598}
599impl StatisticsExec {
600    pub fn new(stats: Statistics, schema: Schema) -> Self {
601        assert_eq!(
602            stats.column_statistics.len(),
603            schema.fields().len(),
604            "if defined, the column statistics vector length should be the number of fields"
605        );
606        let cache = Self::compute_properties(Arc::new(schema.clone()));
607        Self {
608            stats,
609            schema: Arc::new(schema),
610            cache,
611        }
612    }
613
614    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
615    fn compute_properties(schema: SchemaRef) -> PlanProperties {
616        PlanProperties::new(
617            EquivalenceProperties::new(schema),
618            Partitioning::UnknownPartitioning(2),
619            EmissionType::Incremental,
620            Boundedness::Bounded,
621        )
622    }
623}
624
625impl DisplayAs for StatisticsExec {
626    fn fmt_as(
627        &self,
628        t: DisplayFormatType,
629        f: &mut std::fmt::Formatter,
630    ) -> std::fmt::Result {
631        match t {
632            DisplayFormatType::Default | DisplayFormatType::Verbose => {
633                write!(
634                    f,
635                    "StatisticsExec: col_count={}, row_count={:?}",
636                    self.schema.fields().len(),
637                    self.stats.num_rows,
638                )
639            }
640            DisplayFormatType::TreeRender => {
641                // TODO: collect info
642                write!(f, "")
643            }
644        }
645    }
646}
647
648impl ExecutionPlan for StatisticsExec {
649    fn name(&self) -> &'static str {
650        Self::static_name()
651    }
652
653    fn as_any(&self) -> &dyn Any {
654        self
655    }
656
657    fn properties(&self) -> &PlanProperties {
658        &self.cache
659    }
660
661    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
662        vec![]
663    }
664
665    fn with_new_children(
666        self: Arc<Self>,
667        _: Vec<Arc<dyn ExecutionPlan>>,
668    ) -> Result<Arc<dyn ExecutionPlan>> {
669        Ok(self)
670    }
671
672    fn execute(
673        &self,
674        _partition: usize,
675        _context: Arc<TaskContext>,
676    ) -> Result<SendableRecordBatchStream> {
677        unimplemented!("This plan only serves for testing statistics")
678    }
679
680    fn statistics(&self) -> Result<Statistics> {
681        Ok(self.stats.clone())
682    }
683
684    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
685        Ok(if partition.is_some() {
686            Statistics::new_unknown(&self.schema)
687        } else {
688            self.stats.clone()
689        })
690    }
691}
692
693/// Execution plan that emits streams that block forever.
694///
695/// This is useful to test shutdown / cancellation behavior of certain execution plans.
696#[derive(Debug)]
697pub struct BlockingExec {
698    /// Schema that is mocked by this plan.
699    schema: SchemaRef,
700
701    /// Ref-counting helper to check if the plan and the produced stream are still in memory.
702    refs: Arc<()>,
703    cache: PlanProperties,
704}
705
706impl BlockingExec {
707    /// Create new [`BlockingExec`] with a give schema and number of partitions.
708    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
709        let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
710        Self {
711            schema,
712            refs: Default::default(),
713            cache,
714        }
715    }
716
717    /// Weak pointer that can be used for ref-counting this execution plan and its streams.
718    ///
719    /// Use [`Weak::strong_count`] to determine if the plan itself and its streams are dropped (should be 0 in that
720    /// case). Note that tokio might take some time to cancel spawned tasks, so you need to wrap this check into a retry
721    /// loop. Use [`assert_strong_count_converges_to_zero`] to archive this.
722    pub fn refs(&self) -> Weak<()> {
723        Arc::downgrade(&self.refs)
724    }
725
726    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
727    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
728        PlanProperties::new(
729            EquivalenceProperties::new(schema),
730            Partitioning::UnknownPartitioning(n_partitions),
731            EmissionType::Incremental,
732            Boundedness::Bounded,
733        )
734    }
735}
736
737impl DisplayAs for BlockingExec {
738    fn fmt_as(
739        &self,
740        t: DisplayFormatType,
741        f: &mut std::fmt::Formatter,
742    ) -> std::fmt::Result {
743        match t {
744            DisplayFormatType::Default | DisplayFormatType::Verbose => {
745                write!(f, "BlockingExec",)
746            }
747            DisplayFormatType::TreeRender => {
748                // TODO: collect info
749                write!(f, "")
750            }
751        }
752    }
753}
754
755impl ExecutionPlan for BlockingExec {
756    fn name(&self) -> &'static str {
757        Self::static_name()
758    }
759
760    fn as_any(&self) -> &dyn Any {
761        self
762    }
763
764    fn properties(&self) -> &PlanProperties {
765        &self.cache
766    }
767
768    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
769        // this is a leaf node and has no children
770        vec![]
771    }
772
773    fn with_new_children(
774        self: Arc<Self>,
775        _: Vec<Arc<dyn ExecutionPlan>>,
776    ) -> Result<Arc<dyn ExecutionPlan>> {
777        internal_err!("Children cannot be replaced in {self:?}")
778    }
779
780    fn execute(
781        &self,
782        _partition: usize,
783        _context: Arc<TaskContext>,
784    ) -> Result<SendableRecordBatchStream> {
785        Ok(Box::pin(BlockingStream {
786            schema: Arc::clone(&self.schema),
787            _refs: Arc::clone(&self.refs),
788        }))
789    }
790}
791
792/// A [`RecordBatchStream`] that is pending forever.
793#[derive(Debug)]
794pub struct BlockingStream {
795    /// Schema mocked by this stream.
796    schema: SchemaRef,
797
798    /// Ref-counting helper to check if the stream are still in memory.
799    _refs: Arc<()>,
800}
801
802impl Stream for BlockingStream {
803    type Item = Result<RecordBatch>;
804
805    fn poll_next(
806        self: Pin<&mut Self>,
807        _cx: &mut Context<'_>,
808    ) -> Poll<Option<Self::Item>> {
809        Poll::Pending
810    }
811}
812
813impl RecordBatchStream for BlockingStream {
814    fn schema(&self) -> SchemaRef {
815        Arc::clone(&self.schema)
816    }
817}
818
819/// Asserts that the strong count of the given [`Weak`] pointer converges to zero.
820///
821/// This might take a while but has a timeout.
822pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
823    tokio::time::timeout(std::time::Duration::from_secs(10), async {
824        loop {
825            if dbg!(Weak::strong_count(&refs)) == 0 {
826                break;
827            }
828            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
829        }
830    })
831    .await
832    .unwrap();
833}
834
835/// Execution plan that emits streams that panics.
836///
837/// This is useful to test panic handling of certain execution plans.
838#[derive(Debug)]
839pub struct PanicExec {
840    /// Schema that is mocked by this plan.
841    schema: SchemaRef,
842
843    /// Number of output partitions. Each partition will produce this
844    /// many empty output record batches prior to panicking
845    batches_until_panics: Vec<usize>,
846    cache: PlanProperties,
847}
848
849impl PanicExec {
850    /// Create new [`PanicExec`] with a give schema and number of
851    /// partitions, which will each panic immediately.
852    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
853        let batches_until_panics = vec![0; n_partitions];
854        let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
855        Self {
856            schema,
857            batches_until_panics,
858            cache,
859        }
860    }
861
862    /// Set the number of batches prior to panic for a partition
863    pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
864        self.batches_until_panics[partition] = count;
865        self
866    }
867
868    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
869    fn compute_properties(
870        schema: SchemaRef,
871        batches_until_panics: &[usize],
872    ) -> PlanProperties {
873        let num_partitions = batches_until_panics.len();
874        PlanProperties::new(
875            EquivalenceProperties::new(schema),
876            Partitioning::UnknownPartitioning(num_partitions),
877            EmissionType::Incremental,
878            Boundedness::Bounded,
879        )
880    }
881}
882
883impl DisplayAs for PanicExec {
884    fn fmt_as(
885        &self,
886        t: DisplayFormatType,
887        f: &mut std::fmt::Formatter,
888    ) -> std::fmt::Result {
889        match t {
890            DisplayFormatType::Default | DisplayFormatType::Verbose => {
891                write!(f, "PanicExec",)
892            }
893            DisplayFormatType::TreeRender => {
894                // TODO: collect info
895                write!(f, "")
896            }
897        }
898    }
899}
900
901impl ExecutionPlan for PanicExec {
902    fn name(&self) -> &'static str {
903        Self::static_name()
904    }
905
906    fn as_any(&self) -> &dyn Any {
907        self
908    }
909
910    fn properties(&self) -> &PlanProperties {
911        &self.cache
912    }
913
914    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
915        // this is a leaf node and has no children
916        vec![]
917    }
918
919    fn with_new_children(
920        self: Arc<Self>,
921        _: Vec<Arc<dyn ExecutionPlan>>,
922    ) -> Result<Arc<dyn ExecutionPlan>> {
923        internal_err!("Children cannot be replaced in {:?}", self)
924    }
925
926    fn execute(
927        &self,
928        partition: usize,
929        _context: Arc<TaskContext>,
930    ) -> Result<SendableRecordBatchStream> {
931        Ok(Box::pin(PanicStream {
932            partition,
933            batches_until_panic: self.batches_until_panics[partition],
934            schema: Arc::clone(&self.schema),
935            ready: false,
936        }))
937    }
938}
939
940/// A [`RecordBatchStream`] that yields every other batch and panics
941/// after `batches_until_panic` batches have been produced.
942///
943/// Useful for testing the behavior of streams on panic
944#[derive(Debug)]
945struct PanicStream {
946    /// Which partition was this
947    partition: usize,
948    /// How may batches will be produced until panic
949    batches_until_panic: usize,
950    /// Schema mocked by this stream.
951    schema: SchemaRef,
952    /// Should we return ready ?
953    ready: bool,
954}
955
956impl Stream for PanicStream {
957    type Item = Result<RecordBatch>;
958
959    fn poll_next(
960        mut self: Pin<&mut Self>,
961        cx: &mut Context<'_>,
962    ) -> Poll<Option<Self::Item>> {
963        if self.batches_until_panic > 0 {
964            if self.ready {
965                self.batches_until_panic -= 1;
966                self.ready = false;
967                let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
968                return Poll::Ready(Some(Ok(batch)));
969            } else {
970                self.ready = true;
971                // get called again
972                cx.waker().wake_by_ref();
973                return Poll::Pending;
974            }
975        }
976        panic!("PanickingStream did panic: {}", self.partition)
977    }
978}
979
980impl RecordBatchStream for PanicStream {
981    fn schema(&self) -> SchemaRef {
982        Arc::clone(&self.schema)
983    }
984}