datafusion_table_providers/util/
test.rs1use std::{any::Any, sync::Arc};
2
3use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
4use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
5use datafusion::{
6 common::Statistics,
7 error::{DataFusionError, Result},
8 execution::{SendableRecordBatchStream, TaskContext},
9 physical_expr::EquivalenceProperties,
10 physical_plan::{
11 common,
12 stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
13 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
14 },
15};
16
17#[derive(Debug)]
20pub struct MockExec {
21 data: Vec<Result<RecordBatch>>,
23 schema: SchemaRef,
24 use_task: bool,
27 cache: PlanProperties,
28}
29
30impl MockExec {
31 pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
39 let cache = Self::compute_properties(Arc::clone(&schema));
40 Self {
41 data,
42 schema,
43 use_task: true,
44 cache,
45 }
46 }
47
48 pub fn with_use_task(mut self, use_task: bool) -> Self {
52 self.use_task = use_task;
53 self
54 }
55
56 fn compute_properties(schema: SchemaRef) -> PlanProperties {
58 let eq_properties = EquivalenceProperties::new(schema);
59
60 PlanProperties::new(
61 eq_properties,
62 Partitioning::UnknownPartitioning(1),
63 EmissionType::Incremental,
64 Boundedness::Bounded,
65 )
66 }
67}
68
69impl DisplayAs for MockExec {
70 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
71 match t {
72 DisplayFormatType::Default | DisplayFormatType::Verbose => {
73 write!(f, "MockExec")
74 }
75 }
76 }
77}
78
79impl ExecutionPlan for MockExec {
80 fn name(&self) -> &'static str {
81 Self::static_name()
82 }
83
84 fn as_any(&self) -> &dyn Any {
85 self
86 }
87
88 fn properties(&self) -> &PlanProperties {
89 &self.cache
90 }
91
92 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
93 vec![]
94 }
95
96 fn with_new_children(
97 self: Arc<Self>,
98 _: Vec<Arc<dyn ExecutionPlan>>,
99 ) -> Result<Arc<dyn ExecutionPlan>> {
100 unimplemented!()
101 }
102
103 fn execute(
105 &self,
106 partition: usize,
107 _context: Arc<TaskContext>,
108 ) -> Result<SendableRecordBatchStream> {
109 assert_eq!(partition, 0);
110
111 let data: Vec<_> = self
113 .data
114 .iter()
115 .map(|r| match r {
116 Ok(batch) => Ok(batch.clone()),
117 Err(e) => Err(clone_error(e)),
118 })
119 .collect();
120
121 if self.use_task {
122 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
123 let tx = builder.tx();
127 builder.spawn(async move {
128 for batch in data {
129 println!("Sending batch via delayed stream");
130 if let Err(e) = tx.send(batch).await {
131 println!("ERROR batch via delayed stream: {e}");
132 }
133 }
134
135 Ok(())
136 });
137 Ok(builder.build())
139 } else {
140 let stream = futures::stream::iter(data);
142 Ok(Box::pin(RecordBatchStreamAdapter::new(
143 self.schema(),
144 stream,
145 )))
146 }
147 }
148
149 fn statistics(&self) -> Result<Statistics> {
151 let data: Result<Vec<_>> = self
152 .data
153 .iter()
154 .map(|r| match r {
155 Ok(batch) => Ok(batch.clone()),
156 Err(e) => Err(clone_error(e)),
157 })
158 .collect();
159
160 let data = data?;
161
162 Ok(common::compute_record_batch_statistics(
163 &[data],
164 &self.schema,
165 None,
166 ))
167 }
168}
169
170fn clone_error(e: &DataFusionError) -> DataFusionError {
171 use DataFusionError::*;
172 match e {
173 Execution(msg) => Execution(msg.to_string()),
174 _ => unimplemented!(),
175 }
176}