datafusion_table_providers/util/
test.rs1use std::{any::Any, sync::Arc};
2
3use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
4use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
5use datafusion::{
6 common::Statistics,
7 error::{DataFusionError, Result},
8 execution::{SendableRecordBatchStream, TaskContext},
9 physical_expr::EquivalenceProperties,
10 physical_plan::{
11 common,
12 stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
13 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
14 },
15};
16
17#[derive(Debug)]
20pub struct MockExec {
21 data: Vec<Result<RecordBatch>>,
23 schema: SchemaRef,
24 use_task: bool,
27 cache: PlanProperties,
28}
29
30impl MockExec {
31 pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
39 let cache = Self::compute_properties(Arc::clone(&schema));
40 Self {
41 data,
42 schema,
43 use_task: true,
44 cache,
45 }
46 }
47
48 pub fn with_use_task(mut self, use_task: bool) -> Self {
52 self.use_task = use_task;
53 self
54 }
55
56 fn compute_properties(schema: SchemaRef) -> PlanProperties {
58 let eq_properties = EquivalenceProperties::new(schema);
59
60 PlanProperties::new(
61 eq_properties,
62 Partitioning::UnknownPartitioning(1),
63 EmissionType::Incremental,
64 Boundedness::Bounded,
65 )
66 }
67}
68
69impl DisplayAs for MockExec {
70 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
71 match t {
72 DisplayFormatType::Default
73 | DisplayFormatType::Verbose
74 | DisplayFormatType::TreeRender => {
75 write!(f, "MockExec")
76 }
77 }
78 }
79}
80
81impl ExecutionPlan for MockExec {
82 fn name(&self) -> &'static str {
83 Self::static_name()
84 }
85
86 fn as_any(&self) -> &dyn Any {
87 self
88 }
89
90 fn properties(&self) -> &PlanProperties {
91 &self.cache
92 }
93
94 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
95 vec![]
96 }
97
98 fn with_new_children(
99 self: Arc<Self>,
100 _: Vec<Arc<dyn ExecutionPlan>>,
101 ) -> Result<Arc<dyn ExecutionPlan>> {
102 unimplemented!()
103 }
104
105 fn execute(
107 &self,
108 partition: usize,
109 _context: Arc<TaskContext>,
110 ) -> Result<SendableRecordBatchStream> {
111 assert_eq!(partition, 0);
112
113 let data: Vec<_> = self
115 .data
116 .iter()
117 .map(|r| match r {
118 Ok(batch) => Ok(batch.clone()),
119 Err(e) => Err(clone_error(e)),
120 })
121 .collect();
122
123 if self.use_task {
124 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
125 let tx = builder.tx();
129 builder.spawn(async move {
130 for batch in data {
131 println!("Sending batch via delayed stream");
132 if let Err(e) = tx.send(batch).await {
133 println!("ERROR batch via delayed stream: {e}");
134 }
135 }
136
137 Ok(())
138 });
139 Ok(builder.build())
141 } else {
142 let stream = futures::stream::iter(data);
144 Ok(Box::pin(RecordBatchStreamAdapter::new(
145 self.schema(),
146 stream,
147 )))
148 }
149 }
150
151 fn statistics(&self) -> Result<Statistics> {
153 let data: Result<Vec<_>> = self
154 .data
155 .iter()
156 .map(|r| match r {
157 Ok(batch) => Ok(batch.clone()),
158 Err(e) => Err(clone_error(e)),
159 })
160 .collect();
161
162 let data = data?;
163
164 Ok(common::compute_record_batch_statistics(
165 &[data],
166 &self.schema,
167 None,
168 ))
169 }
170}
171
172fn clone_error(e: &DataFusionError) -> DataFusionError {
173 use DataFusionError::*;
174 match e {
175 Execution(msg) => Execution(msg.to_string()),
176 _ => unimplemented!(),
177 }
178}