datafusion_table_providers/util/
test.rs

1use std::{any::Any, sync::Arc};
2
3use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
4use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
5use datafusion::{
6    common::Statistics,
7    error::{DataFusionError, Result},
8    execution::{SendableRecordBatchStream, TaskContext},
9    physical_expr::EquivalenceProperties,
10    physical_plan::{
11        common,
12        stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
13        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
14    },
15};
16
17/// A Mock ExecutionPlan that can be used for writing tests of other
18/// ExecutionPlans
19#[derive(Debug)]
20pub struct MockExec {
21    /// the results to send back
22    data: Vec<Result<RecordBatch>>,
23    schema: SchemaRef,
24    /// if true (the default), sends data using a separate task to ensure the
25    /// batches are not available without this stream yielding first
26    use_task: bool,
27    cache: PlanProperties,
28}
29
30impl MockExec {
31    /// Create a new `MockExec` with a single partition that returns
32    /// the specified `Results`s.
33    ///
34    /// By default, the batches are not produced immediately (the
35    /// caller has to actually yield and another task must run) to
36    /// ensure any poll loops are correct. This behavior can be
37    /// changed with `with_use_task`
38    pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
39        let cache = Self::compute_properties(Arc::clone(&schema));
40        Self {
41            data,
42            schema,
43            use_task: true,
44            cache,
45        }
46    }
47
48    /// If `use_task` is true (the default) then the batches are sent
49    /// back using a separate task to ensure the underlying stream is
50    /// not immediately ready
51    pub fn with_use_task(mut self, use_task: bool) -> Self {
52        self.use_task = use_task;
53        self
54    }
55
56    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
57    fn compute_properties(schema: SchemaRef) -> PlanProperties {
58        let eq_properties = EquivalenceProperties::new(schema);
59
60        PlanProperties::new(
61            eq_properties,
62            Partitioning::UnknownPartitioning(1),
63            EmissionType::Incremental,
64            Boundedness::Bounded,
65        )
66    }
67}
68
69impl DisplayAs for MockExec {
70    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
71        match t {
72            DisplayFormatType::Default
73            | DisplayFormatType::Verbose
74            | DisplayFormatType::TreeRender => {
75                write!(f, "MockExec")
76            }
77        }
78    }
79}
80
81impl ExecutionPlan for MockExec {
82    fn name(&self) -> &'static str {
83        Self::static_name()
84    }
85
86    fn as_any(&self) -> &dyn Any {
87        self
88    }
89
90    fn properties(&self) -> &PlanProperties {
91        &self.cache
92    }
93
94    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
95        vec![]
96    }
97
98    fn with_new_children(
99        self: Arc<Self>,
100        _: Vec<Arc<dyn ExecutionPlan>>,
101    ) -> Result<Arc<dyn ExecutionPlan>> {
102        unimplemented!()
103    }
104
105    /// Returns a stream which yields data
106    fn execute(
107        &self,
108        partition: usize,
109        _context: Arc<TaskContext>,
110    ) -> Result<SendableRecordBatchStream> {
111        assert_eq!(partition, 0);
112
113        // Result doesn't implement clone, so do it ourself
114        let data: Vec<_> = self
115            .data
116            .iter()
117            .map(|r| match r {
118                Ok(batch) => Ok(batch.clone()),
119                Err(e) => Err(clone_error(e)),
120            })
121            .collect();
122
123        if self.use_task {
124            let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
125            // send data in order but in a separate task (to ensure
126            // the batches are not available without the stream
127            // yielding).
128            let tx = builder.tx();
129            builder.spawn(async move {
130                for batch in data {
131                    println!("Sending batch via delayed stream");
132                    if let Err(e) = tx.send(batch).await {
133                        println!("ERROR batch via delayed stream: {e}");
134                    }
135                }
136
137                Ok(())
138            });
139            // returned stream simply reads off the rx stream
140            Ok(builder.build())
141        } else {
142            // make an input that will error
143            let stream = futures::stream::iter(data);
144            Ok(Box::pin(RecordBatchStreamAdapter::new(
145                self.schema(),
146                stream,
147            )))
148        }
149    }
150
151    // Panics if one of the batches is an error
152    fn statistics(&self) -> Result<Statistics> {
153        let data: Result<Vec<_>> = self
154            .data
155            .iter()
156            .map(|r| match r {
157                Ok(batch) => Ok(batch.clone()),
158                Err(e) => Err(clone_error(e)),
159            })
160            .collect();
161
162        let data = data?;
163
164        Ok(common::compute_record_batch_statistics(
165            &[data],
166            &self.schema,
167            None,
168        ))
169    }
170}
171
172fn clone_error(e: &DataFusionError) -> DataFusionError {
173    use DataFusionError::*;
174    match e {
175        Execution(msg) => Execution(msg.to_string()),
176        _ => unimplemented!(),
177    }
178}