Skip to main content

datafusion_physical_plan/test/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Simple iterator over batches for use in testing
19
20use crate::{
21    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
22    RecordBatchStream, SendableRecordBatchStream, Statistics, common,
23    execution_plan::Boundedness,
24};
25use crate::{
26    execution_plan::EmissionType,
27    stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
28};
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::{
31    pin::Pin,
32    sync::{Arc, Weak},
33    task::{Context, Poll},
34};
35
36use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
37use arrow::record_batch::RecordBatch;
38use datafusion_common::{DataFusionError, Result, internal_err};
39use datafusion_execution::TaskContext;
40use datafusion_physical_expr::EquivalenceProperties;
41
42use futures::Stream;
43use tokio::sync::Barrier;
44
45/// Index into the data that has been returned so far
46#[derive(Debug, Default, Clone)]
47pub struct BatchIndex {
48    inner: Arc<std::sync::Mutex<usize>>,
49}
50
51impl BatchIndex {
52    /// Return the current index
53    pub fn value(&self) -> usize {
54        let inner = self.inner.lock().unwrap();
55        *inner
56    }
57
58    // increment the current index by one
59    pub fn incr(&self) {
60        let mut inner = self.inner.lock().unwrap();
61        *inner += 1;
62    }
63}
64
65/// Iterator over batches
66#[derive(Debug, Default)]
67pub struct TestStream {
68    /// Vector of record batches
69    data: Vec<RecordBatch>,
70    /// Index into the data that has been returned so far
71    index: BatchIndex,
72}
73
74impl TestStream {
75    /// Create an iterator for a vector of record batches. Assumes at
76    /// least one entry in data (for the schema)
77    pub fn new(data: Vec<RecordBatch>) -> Self {
78        Self {
79            data,
80            ..Default::default()
81        }
82    }
83
84    /// Return a handle to the index counter for this stream
85    pub fn index(&self) -> BatchIndex {
86        self.index.clone()
87    }
88}
89
90impl Stream for TestStream {
91    type Item = Result<RecordBatch>;
92
93    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
94        let next_batch = self.index.value();
95
96        Poll::Ready(if next_batch < self.data.len() {
97            let next_batch = self.index.value();
98            self.index.incr();
99            Some(Ok(self.data[next_batch].clone()))
100        } else {
101            None
102        })
103    }
104
105    fn size_hint(&self) -> (usize, Option<usize>) {
106        (self.data.len(), Some(self.data.len()))
107    }
108}
109
110impl RecordBatchStream for TestStream {
111    /// Get the schema
112    fn schema(&self) -> SchemaRef {
113        self.data[0].schema()
114    }
115}
116
117/// A Mock ExecutionPlan that can be used for writing tests of other
118/// ExecutionPlans
119#[derive(Debug)]
120pub struct MockExec {
121    /// the results to send back
122    data: Vec<Result<RecordBatch>>,
123    schema: SchemaRef,
124    /// if true (the default), sends data using a separate task to ensure the
125    /// batches are not available without this stream yielding first
126    use_task: bool,
127    cache: Arc<PlanProperties>,
128}
129
130impl MockExec {
131    /// Create a new `MockExec` with a single partition that returns
132    /// the specified `Results`s.
133    ///
134    /// By default, the batches are not produced immediately (the
135    /// caller has to actually yield and another task must run) to
136    /// ensure any poll loops are correct. This behavior can be
137    /// changed with `with_use_task`
138    pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
139        let cache = Self::compute_properties(Arc::clone(&schema));
140        Self {
141            data,
142            schema,
143            use_task: true,
144            cache: Arc::new(cache),
145        }
146    }
147
148    /// If `use_task` is true (the default) then the batches are sent
149    /// back using a separate task to ensure the underlying stream is
150    /// not immediately ready
151    pub fn with_use_task(mut self, use_task: bool) -> Self {
152        self.use_task = use_task;
153        self
154    }
155
156    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
157    fn compute_properties(schema: SchemaRef) -> PlanProperties {
158        PlanProperties::new(
159            EquivalenceProperties::new(schema),
160            Partitioning::UnknownPartitioning(1),
161            EmissionType::Incremental,
162            Boundedness::Bounded,
163        )
164    }
165}
166
167impl DisplayAs for MockExec {
168    fn fmt_as(
169        &self,
170        t: DisplayFormatType,
171        f: &mut std::fmt::Formatter,
172    ) -> std::fmt::Result {
173        match t {
174            DisplayFormatType::Default | DisplayFormatType::Verbose => {
175                write!(f, "MockExec")
176            }
177            DisplayFormatType::TreeRender => {
178                // TODO: collect info
179                write!(f, "")
180            }
181        }
182    }
183}
184
185impl ExecutionPlan for MockExec {
186    fn name(&self) -> &'static str {
187        Self::static_name()
188    }
189
190    fn properties(&self) -> &Arc<PlanProperties> {
191        &self.cache
192    }
193
194    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
195        vec![]
196    }
197
198    fn with_new_children(
199        self: Arc<Self>,
200        _: Vec<Arc<dyn ExecutionPlan>>,
201    ) -> Result<Arc<dyn ExecutionPlan>> {
202        unimplemented!()
203    }
204
205    /// Returns a stream which yields data
206    fn execute(
207        &self,
208        partition: usize,
209        _context: Arc<TaskContext>,
210    ) -> Result<SendableRecordBatchStream> {
211        assert_eq!(partition, 0);
212
213        // Result doesn't implement clone, so do it ourself
214        let data: Vec<_> = self
215            .data
216            .iter()
217            .map(|r| match r {
218                Ok(batch) => Ok(batch.clone()),
219                Err(e) => Err(clone_error(e)),
220            })
221            .collect();
222
223        if self.use_task {
224            let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
225            // send data in order but in a separate task (to ensure
226            // the batches are not available without the stream
227            // yielding).
228            let tx = builder.tx();
229            builder.spawn(async move {
230                for batch in data {
231                    println!("Sending batch via delayed stream");
232                    if let Err(e) = tx.send(batch).await {
233                        println!("ERROR batch via delayed stream: {e}");
234                    }
235                }
236
237                Ok(())
238            });
239            // returned stream simply reads off the rx stream
240            Ok(builder.build())
241        } else {
242            // make an input that will error
243            let stream = futures::stream::iter(data);
244            Ok(Box::pin(RecordBatchStreamAdapter::new(
245                self.schema(),
246                stream,
247            )))
248        }
249    }
250
251    // Panics if one of the batches is an error
252    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
253        if partition.is_some() {
254            return Ok(Arc::new(Statistics::new_unknown(&self.schema)));
255        }
256        let data: Result<Vec<_>> = self
257            .data
258            .iter()
259            .map(|r| match r {
260                Ok(batch) => Ok(batch.clone()),
261                Err(e) => Err(clone_error(e)),
262            })
263            .collect();
264
265        let data = data?;
266
267        Ok(Arc::new(common::compute_record_batch_statistics(
268            &[data],
269            &self.schema,
270            None,
271        )))
272    }
273}
274
275fn clone_error(e: &DataFusionError) -> DataFusionError {
276    use DataFusionError::*;
277    match e {
278        Execution(msg) => Execution(msg.to_string()),
279        _ => unimplemented!(),
280    }
281}
282
283/// A Mock ExecutionPlan that does not start producing input until a
284/// barrier is called
285#[derive(Debug)]
286pub struct BarrierExec {
287    /// partitions to send back
288    data: Vec<Vec<RecordBatch>>,
289    schema: SchemaRef,
290
291    /// all streams wait on this barrier to produce
292    start_data_barrier: Option<Arc<Barrier>>,
293
294    /// the stream wait for this to return Poll::Ready(None)
295    finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
296
297    cache: Arc<PlanProperties>,
298
299    log: bool,
300}
301
302impl BarrierExec {
303    /// Create a new exec with some number of partitions.
304    pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
305        // wait for all streams and the input
306        let barrier = Some(Arc::new(Barrier::new(data.len() + 1)));
307        let cache = Self::compute_properties(Arc::clone(&schema), &data);
308        Self {
309            data,
310            schema,
311            start_data_barrier: barrier,
312            cache: Arc::new(cache),
313            finish_barrier: None,
314            log: true,
315        }
316    }
317
318    pub fn with_log(mut self, log: bool) -> Self {
319        self.log = log;
320        self
321    }
322
323    pub fn without_start_barrier(mut self) -> Self {
324        self.start_data_barrier = None;
325        self
326    }
327
328    pub fn with_finish_barrier(mut self) -> Self {
329        let barrier = Arc::new((
330            // wait for all streams and the input
331            Barrier::new(self.data.len() + 1),
332            AtomicUsize::new(0),
333        ));
334
335        self.finish_barrier = Some(barrier);
336        self
337    }
338
339    /// wait until all the input streams and this function is ready
340    pub async fn wait(&self) {
341        let barrier = &self
342            .start_data_barrier
343            .as_ref()
344            .expect("Must only be called when having a start barrier");
345        if self.log {
346            println!("BarrierExec::wait waiting on barrier");
347        }
348        barrier.wait().await;
349        if self.log {
350            println!("BarrierExec::wait done waiting");
351        }
352    }
353
354    pub async fn wait_finish(&self) {
355        let (barrier, _) = &self
356            .finish_barrier
357            .as_deref()
358            .expect("Must only be called when having a finish barrier");
359
360        if self.log {
361            println!("BarrierExec::wait_finish waiting on barrier");
362        }
363        barrier.wait().await;
364        if self.log {
365            println!("BarrierExec::wait_finish done waiting");
366        }
367    }
368
369    /// Return true if the finish barrier has been reached in all partitions
370    pub fn is_finish_barrier_reached(&self) -> bool {
371        let (_, reached_finish) = self
372            .finish_barrier
373            .as_deref()
374            .expect("Must only be called when having finish barrier");
375
376        reached_finish.load(Ordering::Relaxed) == self.data.len()
377    }
378
379    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
380    fn compute_properties(
381        schema: SchemaRef,
382        data: &[Vec<RecordBatch>],
383    ) -> PlanProperties {
384        PlanProperties::new(
385            EquivalenceProperties::new(schema),
386            Partitioning::UnknownPartitioning(data.len()),
387            EmissionType::Incremental,
388            Boundedness::Bounded,
389        )
390    }
391}
392
393impl DisplayAs for BarrierExec {
394    fn fmt_as(
395        &self,
396        t: DisplayFormatType,
397        f: &mut std::fmt::Formatter,
398    ) -> std::fmt::Result {
399        match t {
400            DisplayFormatType::Default | DisplayFormatType::Verbose => {
401                write!(f, "BarrierExec")
402            }
403            DisplayFormatType::TreeRender => {
404                // TODO: collect info
405                write!(f, "")
406            }
407        }
408    }
409}
410
411impl ExecutionPlan for BarrierExec {
412    fn name(&self) -> &'static str {
413        Self::static_name()
414    }
415
416    fn properties(&self) -> &Arc<PlanProperties> {
417        &self.cache
418    }
419
420    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
421        unimplemented!()
422    }
423
424    fn with_new_children(
425        self: Arc<Self>,
426        _: Vec<Arc<dyn ExecutionPlan>>,
427    ) -> Result<Arc<dyn ExecutionPlan>> {
428        unimplemented!()
429    }
430
431    /// Returns a stream which yields data
432    fn execute(
433        &self,
434        partition: usize,
435        _context: Arc<TaskContext>,
436    ) -> Result<SendableRecordBatchStream> {
437        assert!(partition < self.data.len());
438
439        let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
440
441        // task simply sends data in order after barrier is reached
442        let data = self.data[partition].clone();
443        let start_barrier = self.start_data_barrier.as_ref().map(Arc::clone);
444        let finish_barrier = self.finish_barrier.as_ref().map(Arc::clone);
445        let log = self.log;
446        let tx = builder.tx();
447        builder.spawn(async move {
448            if let Some(barrier) = start_barrier {
449                if log {
450                    println!("Partition {partition} waiting on barrier");
451                }
452                barrier.wait().await;
453            }
454            for batch in data {
455                if log {
456                    println!("Partition {partition} sending batch");
457                }
458                if let Err(e) = tx.send(Ok(batch)).await {
459                    println!("ERROR batch via barrier stream stream: {e}");
460                }
461            }
462            if let Some((barrier, reached_finish)) = finish_barrier.as_deref() {
463                if log {
464                    println!("Partition {partition} waiting on finish barrier");
465                }
466                reached_finish.fetch_add(1, Ordering::Relaxed);
467                barrier.wait().await;
468            }
469
470            Ok(())
471        });
472
473        // returned stream simply reads off the rx stream
474        Ok(builder.build())
475    }
476
477    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
478        if partition.is_some() {
479            return Ok(Arc::new(Statistics::new_unknown(&self.schema)));
480        }
481        Ok(Arc::new(common::compute_record_batch_statistics(
482            &self.data,
483            &self.schema,
484            None,
485        )))
486    }
487}
488
489/// A mock execution plan that errors on a call to execute
490#[derive(Debug)]
491pub struct ErrorExec {
492    cache: Arc<PlanProperties>,
493}
494
495impl Default for ErrorExec {
496    fn default() -> Self {
497        Self::new()
498    }
499}
500
501impl ErrorExec {
502    pub fn new() -> Self {
503        let schema = Arc::new(Schema::new(vec![Field::new(
504            "dummy",
505            DataType::Int64,
506            true,
507        )]));
508        let cache = Self::compute_properties(schema);
509        Self {
510            cache: Arc::new(cache),
511        }
512    }
513
514    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
515    fn compute_properties(schema: SchemaRef) -> PlanProperties {
516        PlanProperties::new(
517            EquivalenceProperties::new(schema),
518            Partitioning::UnknownPartitioning(1),
519            EmissionType::Incremental,
520            Boundedness::Bounded,
521        )
522    }
523}
524
525impl DisplayAs for ErrorExec {
526    fn fmt_as(
527        &self,
528        t: DisplayFormatType,
529        f: &mut std::fmt::Formatter,
530    ) -> std::fmt::Result {
531        match t {
532            DisplayFormatType::Default | DisplayFormatType::Verbose => {
533                write!(f, "ErrorExec")
534            }
535            DisplayFormatType::TreeRender => {
536                // TODO: collect info
537                write!(f, "")
538            }
539        }
540    }
541}
542
543impl ExecutionPlan for ErrorExec {
544    fn name(&self) -> &'static str {
545        Self::static_name()
546    }
547
548    fn properties(&self) -> &Arc<PlanProperties> {
549        &self.cache
550    }
551
552    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
553        unimplemented!()
554    }
555
556    fn with_new_children(
557        self: Arc<Self>,
558        _: Vec<Arc<dyn ExecutionPlan>>,
559    ) -> Result<Arc<dyn ExecutionPlan>> {
560        unimplemented!()
561    }
562
563    /// Returns a stream which yields data
564    fn execute(
565        &self,
566        partition: usize,
567        _context: Arc<TaskContext>,
568    ) -> Result<SendableRecordBatchStream> {
569        internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
570    }
571}
572
573/// A mock execution plan that simply returns the provided statistics
574#[derive(Debug, Clone)]
575pub struct StatisticsExec {
576    stats: Statistics,
577    schema: Arc<Schema>,
578    cache: Arc<PlanProperties>,
579}
580impl StatisticsExec {
581    pub fn new(stats: Statistics, schema: Schema) -> Self {
582        assert_eq!(
583            stats.column_statistics.len(),
584            schema.fields().len(),
585            "if defined, the column statistics vector length should be the number of fields"
586        );
587        let cache = Self::compute_properties(Arc::new(schema.clone()));
588        Self {
589            stats,
590            schema: Arc::new(schema),
591            cache: Arc::new(cache),
592        }
593    }
594
595    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
596    fn compute_properties(schema: SchemaRef) -> PlanProperties {
597        PlanProperties::new(
598            EquivalenceProperties::new(schema),
599            Partitioning::UnknownPartitioning(2),
600            EmissionType::Incremental,
601            Boundedness::Bounded,
602        )
603    }
604}
605
606impl DisplayAs for StatisticsExec {
607    fn fmt_as(
608        &self,
609        t: DisplayFormatType,
610        f: &mut std::fmt::Formatter,
611    ) -> std::fmt::Result {
612        match t {
613            DisplayFormatType::Default | DisplayFormatType::Verbose => {
614                write!(
615                    f,
616                    "StatisticsExec: col_count={}, row_count={:?}",
617                    self.schema.fields().len(),
618                    self.stats.num_rows,
619                )
620            }
621            DisplayFormatType::TreeRender => {
622                // TODO: collect info
623                write!(f, "")
624            }
625        }
626    }
627}
628
629impl ExecutionPlan for StatisticsExec {
630    fn name(&self) -> &'static str {
631        Self::static_name()
632    }
633
634    fn properties(&self) -> &Arc<PlanProperties> {
635        &self.cache
636    }
637
638    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
639        vec![]
640    }
641
642    fn with_new_children(
643        self: Arc<Self>,
644        _: Vec<Arc<dyn ExecutionPlan>>,
645    ) -> Result<Arc<dyn ExecutionPlan>> {
646        Ok(self)
647    }
648
649    fn execute(
650        &self,
651        _partition: usize,
652        _context: Arc<TaskContext>,
653    ) -> Result<SendableRecordBatchStream> {
654        unimplemented!("This plan only serves for testing statistics")
655    }
656
657    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
658        Ok(Arc::new(if partition.is_some() {
659            Statistics::new_unknown(&self.schema)
660        } else {
661            self.stats.clone()
662        }))
663    }
664}
665
666/// Execution plan that emits streams that block forever.
667///
668/// This is useful to test shutdown / cancellation behavior of certain execution plans.
669#[derive(Debug)]
670pub struct BlockingExec {
671    /// Schema that is mocked by this plan.
672    schema: SchemaRef,
673
674    /// Ref-counting helper to check if the plan and the produced stream are still in memory.
675    refs: Arc<()>,
676    cache: Arc<PlanProperties>,
677}
678
679impl BlockingExec {
680    /// Create new [`BlockingExec`] with a give schema and number of partitions.
681    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
682        let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
683        Self {
684            schema,
685            refs: Default::default(),
686            cache: Arc::new(cache),
687        }
688    }
689
690    /// Weak pointer that can be used for ref-counting this execution plan and its streams.
691    ///
692    /// Use [`Weak::strong_count`] to determine if the plan itself and its streams are dropped (should be 0 in that
693    /// case). Note that tokio might take some time to cancel spawned tasks, so you need to wrap this check into a retry
694    /// loop. Use [`assert_strong_count_converges_to_zero`] to archive this.
695    pub fn refs(&self) -> Weak<()> {
696        Arc::downgrade(&self.refs)
697    }
698
699    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
700    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
701        PlanProperties::new(
702            EquivalenceProperties::new(schema),
703            Partitioning::UnknownPartitioning(n_partitions),
704            EmissionType::Incremental,
705            Boundedness::Bounded,
706        )
707    }
708}
709
710impl DisplayAs for BlockingExec {
711    fn fmt_as(
712        &self,
713        t: DisplayFormatType,
714        f: &mut std::fmt::Formatter,
715    ) -> std::fmt::Result {
716        match t {
717            DisplayFormatType::Default | DisplayFormatType::Verbose => {
718                write!(f, "BlockingExec",)
719            }
720            DisplayFormatType::TreeRender => {
721                // TODO: collect info
722                write!(f, "")
723            }
724        }
725    }
726}
727
728impl ExecutionPlan for BlockingExec {
729    fn name(&self) -> &'static str {
730        Self::static_name()
731    }
732
733    fn properties(&self) -> &Arc<PlanProperties> {
734        &self.cache
735    }
736
737    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
738        // this is a leaf node and has no children
739        vec![]
740    }
741
742    fn with_new_children(
743        self: Arc<Self>,
744        _: Vec<Arc<dyn ExecutionPlan>>,
745    ) -> Result<Arc<dyn ExecutionPlan>> {
746        internal_err!("Children cannot be replaced in {self:?}")
747    }
748
749    fn execute(
750        &self,
751        _partition: usize,
752        _context: Arc<TaskContext>,
753    ) -> Result<SendableRecordBatchStream> {
754        Ok(Box::pin(BlockingStream {
755            schema: Arc::clone(&self.schema),
756            _refs: Arc::clone(&self.refs),
757        }))
758    }
759}
760
761/// A [`RecordBatchStream`] that is pending forever.
762#[derive(Debug)]
763pub struct BlockingStream {
764    /// Schema mocked by this stream.
765    schema: SchemaRef,
766
767    /// Ref-counting helper to check if the stream are still in memory.
768    _refs: Arc<()>,
769}
770
771impl Stream for BlockingStream {
772    type Item = Result<RecordBatch>;
773
774    fn poll_next(
775        self: Pin<&mut Self>,
776        _cx: &mut Context<'_>,
777    ) -> Poll<Option<Self::Item>> {
778        Poll::Pending
779    }
780}
781
782impl RecordBatchStream for BlockingStream {
783    fn schema(&self) -> SchemaRef {
784        Arc::clone(&self.schema)
785    }
786}
787
788/// Asserts that the strong count of the given [`Weak`] pointer converges to zero.
789///
790/// This might take a while but has a timeout.
791pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
792    tokio::time::timeout(std::time::Duration::from_secs(10), async {
793        loop {
794            if dbg!(Weak::strong_count(&refs)) == 0 {
795                break;
796            }
797            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
798        }
799    })
800    .await
801    .unwrap();
802}
803
804/// Execution plan that emits streams that panics.
805///
806/// This is useful to test panic handling of certain execution plans.
807#[derive(Debug)]
808pub struct PanicExec {
809    /// Schema that is mocked by this plan.
810    schema: SchemaRef,
811
812    /// Number of output partitions. Each partition will produce this
813    /// many empty output record batches prior to panicking
814    batches_until_panics: Vec<usize>,
815    cache: Arc<PlanProperties>,
816}
817
818impl PanicExec {
819    /// Create new [`PanicExec`] with a give schema and number of
820    /// partitions, which will each panic immediately.
821    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
822        let batches_until_panics = vec![0; n_partitions];
823        let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
824        Self {
825            schema,
826            batches_until_panics,
827            cache: Arc::new(cache),
828        }
829    }
830
831    /// Set the number of batches prior to panic for a partition
832    pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
833        self.batches_until_panics[partition] = count;
834        self
835    }
836
837    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
838    fn compute_properties(
839        schema: SchemaRef,
840        batches_until_panics: &[usize],
841    ) -> PlanProperties {
842        let num_partitions = batches_until_panics.len();
843        PlanProperties::new(
844            EquivalenceProperties::new(schema),
845            Partitioning::UnknownPartitioning(num_partitions),
846            EmissionType::Incremental,
847            Boundedness::Bounded,
848        )
849    }
850}
851
852impl DisplayAs for PanicExec {
853    fn fmt_as(
854        &self,
855        t: DisplayFormatType,
856        f: &mut std::fmt::Formatter,
857    ) -> std::fmt::Result {
858        match t {
859            DisplayFormatType::Default | DisplayFormatType::Verbose => {
860                write!(f, "PanicExec",)
861            }
862            DisplayFormatType::TreeRender => {
863                // TODO: collect info
864                write!(f, "")
865            }
866        }
867    }
868}
869
870impl ExecutionPlan for PanicExec {
871    fn name(&self) -> &'static str {
872        Self::static_name()
873    }
874
875    fn properties(&self) -> &Arc<PlanProperties> {
876        &self.cache
877    }
878
879    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
880        // this is a leaf node and has no children
881        vec![]
882    }
883
884    fn with_new_children(
885        self: Arc<Self>,
886        _: Vec<Arc<dyn ExecutionPlan>>,
887    ) -> Result<Arc<dyn ExecutionPlan>> {
888        internal_err!("Children cannot be replaced in {:?}", self)
889    }
890
891    fn execute(
892        &self,
893        partition: usize,
894        _context: Arc<TaskContext>,
895    ) -> Result<SendableRecordBatchStream> {
896        Ok(Box::pin(PanicStream {
897            partition,
898            batches_until_panic: self.batches_until_panics[partition],
899            schema: Arc::clone(&self.schema),
900            ready: false,
901        }))
902    }
903}
904
905/// A [`RecordBatchStream`] that yields every other batch and panics
906/// after `batches_until_panic` batches have been produced.
907///
908/// Useful for testing the behavior of streams on panic
909#[derive(Debug)]
910struct PanicStream {
911    /// Which partition was this
912    partition: usize,
913    /// How may batches will be produced until panic
914    batches_until_panic: usize,
915    /// Schema mocked by this stream.
916    schema: SchemaRef,
917    /// Should we return ready ?
918    ready: bool,
919}
920
921impl Stream for PanicStream {
922    type Item = Result<RecordBatch>;
923
924    fn poll_next(
925        mut self: Pin<&mut Self>,
926        cx: &mut Context<'_>,
927    ) -> Poll<Option<Self::Item>> {
928        if self.batches_until_panic > 0 {
929            if self.ready {
930                self.batches_until_panic -= 1;
931                self.ready = false;
932                let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
933                return Poll::Ready(Some(Ok(batch)));
934            } else {
935                self.ready = true;
936                // get called again
937                cx.waker().wake_by_ref();
938                return Poll::Pending;
939            }
940        }
941        panic!("PanickingStream did panic: {}", self.partition)
942    }
943}
944
945impl RecordBatchStream for PanicStream {
946    fn schema(&self) -> SchemaRef {
947        Arc::clone(&self.schema)
948    }
949}