datafusion_physical_plan/
empty.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
25use crate::{
26 execution_plan::{Boundedness, EmissionType},
27 DisplayFormatType, ExecutionPlan, Partitioning,
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::{internal_err, Result};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use log::trace;
37
38#[derive(Debug, Clone)]
40pub struct EmptyExec {
41 schema: SchemaRef,
43 partitions: usize,
45 cache: PlanProperties,
46}
47
48impl EmptyExec {
49 pub fn new(schema: SchemaRef) -> Self {
51 let cache = Self::compute_properties(Arc::clone(&schema), 1);
52 EmptyExec {
53 schema,
54 partitions: 1,
55 cache,
56 }
57 }
58
59 pub fn with_partitions(mut self, partitions: usize) -> Self {
61 self.partitions = partitions;
62 let output_partitioning = Self::output_partitioning_helper(self.partitions);
64 self.cache = self.cache.with_partitioning(output_partitioning);
65 self
66 }
67
68 fn data(&self) -> Result<Vec<RecordBatch>> {
69 Ok(vec![])
70 }
71
72 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
73 Partitioning::UnknownPartitioning(n_partitions)
74 }
75
76 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
78 PlanProperties::new(
79 EquivalenceProperties::new(schema),
80 Self::output_partitioning_helper(n_partitions),
81 EmissionType::Incremental,
82 Boundedness::Bounded,
83 )
84 }
85}
86
87impl DisplayAs for EmptyExec {
88 fn fmt_as(
89 &self,
90 t: DisplayFormatType,
91 f: &mut std::fmt::Formatter,
92 ) -> std::fmt::Result {
93 match t {
94 DisplayFormatType::Default | DisplayFormatType::Verbose => {
95 write!(f, "EmptyExec")
96 }
97 DisplayFormatType::TreeRender => {
98 write!(f, "")
100 }
101 }
102 }
103}
104
105impl ExecutionPlan for EmptyExec {
106 fn name(&self) -> &'static str {
107 "EmptyExec"
108 }
109
110 fn as_any(&self) -> &dyn Any {
112 self
113 }
114
115 fn properties(&self) -> &PlanProperties {
116 &self.cache
117 }
118
119 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
120 vec![]
121 }
122
123 fn with_new_children(
124 self: Arc<Self>,
125 _: Vec<Arc<dyn ExecutionPlan>>,
126 ) -> Result<Arc<dyn ExecutionPlan>> {
127 Ok(self)
128 }
129
130 fn execute(
131 &self,
132 partition: usize,
133 context: Arc<TaskContext>,
134 ) -> Result<SendableRecordBatchStream> {
135 trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
136
137 if partition >= self.partitions {
138 return internal_err!(
139 "EmptyExec invalid partition {} (expected less than {})",
140 partition,
141 self.partitions
142 );
143 }
144
145 Ok(Box::pin(MemoryStream::try_new(
146 self.data()?,
147 Arc::clone(&self.schema),
148 None,
149 )?))
150 }
151
152 fn statistics(&self) -> Result<Statistics> {
153 self.partition_statistics(None)
154 }
155
156 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
157 if let Some(partition) = partition {
158 if partition >= self.partitions {
159 return internal_err!(
160 "EmptyExec invalid partition {} (expected less than {})",
161 partition,
162 self.partitions
163 );
164 }
165 }
166
167 let batch = self
168 .data()
169 .expect("Create empty RecordBatch should not fail");
170 Ok(common::compute_record_batch_statistics(
171 &[batch],
172 &self.schema,
173 None,
174 ))
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use crate::test;
182 use crate::with_new_children_if_necessary;
183
184 #[tokio::test]
185 async fn empty() -> Result<()> {
186 let task_ctx = Arc::new(TaskContext::default());
187 let schema = test::aggr_test_schema();
188
189 let empty = EmptyExec::new(Arc::clone(&schema));
190 assert_eq!(empty.schema(), schema);
191
192 let iter = empty.execute(0, task_ctx)?;
194 let batches = common::collect(iter).await?;
195 assert!(batches.is_empty());
196
197 Ok(())
198 }
199
200 #[test]
201 fn with_new_children() -> Result<()> {
202 let schema = test::aggr_test_schema();
203 let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
204
205 let empty2 = with_new_children_if_necessary(
206 Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
207 vec![],
208 )?;
209 assert_eq!(empty.schema(), empty2.schema());
210
211 let too_many_kids = vec![empty2];
212 assert!(
213 with_new_children_if_necessary(empty, too_many_kids).is_err(),
214 "expected error when providing list of kids"
215 );
216 Ok(())
217 }
218
219 #[tokio::test]
220 async fn invalid_execute() -> Result<()> {
221 let task_ctx = Arc::new(TaskContext::default());
222 let schema = test::aggr_test_schema();
223 let empty = EmptyExec::new(schema);
224
225 assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
227 assert!(empty.execute(20, task_ctx).is_err());
228 Ok(())
229 }
230}