datafusion_physical_plan/
empty.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
25use crate::{
26 execution_plan::{Boundedness, EmissionType},
27 DisplayFormatType, ExecutionPlan, Partitioning,
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::{internal_err, Result};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use crate::execution_plan::SchedulingType;
37use log::trace;
38
39#[derive(Debug, Clone)]
41pub struct EmptyExec {
42 schema: SchemaRef,
44 partitions: usize,
46 cache: PlanProperties,
47}
48
49impl EmptyExec {
50 pub fn new(schema: SchemaRef) -> Self {
52 let cache = Self::compute_properties(Arc::clone(&schema), 1);
53 EmptyExec {
54 schema,
55 partitions: 1,
56 cache,
57 }
58 }
59
60 pub fn with_partitions(mut self, partitions: usize) -> Self {
62 self.partitions = partitions;
63 let output_partitioning = Self::output_partitioning_helper(self.partitions);
65 self.cache = self.cache.with_partitioning(output_partitioning);
66 self
67 }
68
69 fn data(&self) -> Result<Vec<RecordBatch>> {
70 Ok(vec![])
71 }
72
73 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
74 Partitioning::UnknownPartitioning(n_partitions)
75 }
76
77 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
79 PlanProperties::new(
80 EquivalenceProperties::new(schema),
81 Self::output_partitioning_helper(n_partitions),
82 EmissionType::Incremental,
83 Boundedness::Bounded,
84 )
85 .with_scheduling_type(SchedulingType::Cooperative)
86 }
87}
88
89impl DisplayAs for EmptyExec {
90 fn fmt_as(
91 &self,
92 t: DisplayFormatType,
93 f: &mut std::fmt::Formatter,
94 ) -> std::fmt::Result {
95 match t {
96 DisplayFormatType::Default | DisplayFormatType::Verbose => {
97 write!(f, "EmptyExec")
98 }
99 DisplayFormatType::TreeRender => {
100 write!(f, "")
102 }
103 }
104 }
105}
106
107impl ExecutionPlan for EmptyExec {
108 fn name(&self) -> &'static str {
109 "EmptyExec"
110 }
111
112 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn properties(&self) -> &PlanProperties {
118 &self.cache
119 }
120
121 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
122 vec![]
123 }
124
125 fn with_new_children(
126 self: Arc<Self>,
127 _: Vec<Arc<dyn ExecutionPlan>>,
128 ) -> Result<Arc<dyn ExecutionPlan>> {
129 Ok(self)
130 }
131
132 fn execute(
133 &self,
134 partition: usize,
135 context: Arc<TaskContext>,
136 ) -> Result<SendableRecordBatchStream> {
137 trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
138
139 if partition >= self.partitions {
140 return internal_err!(
141 "EmptyExec invalid partition {} (expected less than {})",
142 partition,
143 self.partitions
144 );
145 }
146
147 Ok(Box::pin(MemoryStream::try_new(
148 self.data()?,
149 Arc::clone(&self.schema),
150 None,
151 )?))
152 }
153
154 fn statistics(&self) -> Result<Statistics> {
155 self.partition_statistics(None)
156 }
157
158 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
159 if let Some(partition) = partition {
160 if partition >= self.partitions {
161 return internal_err!(
162 "EmptyExec invalid partition {} (expected less than {})",
163 partition,
164 self.partitions
165 );
166 }
167 }
168
169 let batch = self
170 .data()
171 .expect("Create empty RecordBatch should not fail");
172 Ok(common::compute_record_batch_statistics(
173 &[batch],
174 &self.schema,
175 None,
176 ))
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use crate::test;
184 use crate::with_new_children_if_necessary;
185
186 #[tokio::test]
187 async fn empty() -> Result<()> {
188 let task_ctx = Arc::new(TaskContext::default());
189 let schema = test::aggr_test_schema();
190
191 let empty = EmptyExec::new(Arc::clone(&schema));
192 assert_eq!(empty.schema(), schema);
193
194 let iter = empty.execute(0, task_ctx)?;
196 let batches = common::collect(iter).await?;
197 assert!(batches.is_empty());
198
199 Ok(())
200 }
201
202 #[test]
203 fn with_new_children() -> Result<()> {
204 let schema = test::aggr_test_schema();
205 let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
206
207 let empty2 = with_new_children_if_necessary(
208 Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
209 vec![],
210 )?;
211 assert_eq!(empty.schema(), empty2.schema());
212
213 let too_many_kids = vec![empty2];
214 assert!(
215 with_new_children_if_necessary(empty, too_many_kids).is_err(),
216 "expected error when providing list of kids"
217 );
218 Ok(())
219 }
220
221 #[tokio::test]
222 async fn invalid_execute() -> Result<()> {
223 let task_ctx = Arc::new(TaskContext::default());
224 let schema = test::aggr_test_schema();
225 let empty = EmptyExec::new(schema);
226
227 assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
229 assert!(empty.execute(20, task_ctx).is_err());
230 Ok(())
231 }
232}