datafusion_physical_plan/
empty.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics, common};
25use crate::{
26 DisplayFormatType, ExecutionPlan, Partitioning,
27 execution_plan::{Boundedness, EmissionType},
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::{Result, assert_or_internal_err};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use crate::execution_plan::SchedulingType;
37use log::trace;
38
39#[derive(Debug, Clone)]
41pub struct EmptyExec {
42 schema: SchemaRef,
44 partitions: usize,
46 cache: PlanProperties,
47}
48
49impl EmptyExec {
50 pub fn new(schema: SchemaRef) -> Self {
52 let cache = Self::compute_properties(Arc::clone(&schema), 1);
53 EmptyExec {
54 schema,
55 partitions: 1,
56 cache,
57 }
58 }
59
60 pub fn with_partitions(mut self, partitions: usize) -> Self {
62 self.partitions = partitions;
63 let output_partitioning = Self::output_partitioning_helper(self.partitions);
65 self.cache = self.cache.with_partitioning(output_partitioning);
66 self
67 }
68
69 fn data(&self) -> Result<Vec<RecordBatch>> {
70 Ok(vec![])
71 }
72
73 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
74 Partitioning::UnknownPartitioning(n_partitions)
75 }
76
77 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
79 PlanProperties::new(
80 EquivalenceProperties::new(schema),
81 Self::output_partitioning_helper(n_partitions),
82 EmissionType::Incremental,
83 Boundedness::Bounded,
84 )
85 .with_scheduling_type(SchedulingType::Cooperative)
86 }
87}
88
89impl DisplayAs for EmptyExec {
90 fn fmt_as(
91 &self,
92 t: DisplayFormatType,
93 f: &mut std::fmt::Formatter,
94 ) -> std::fmt::Result {
95 match t {
96 DisplayFormatType::Default | DisplayFormatType::Verbose => {
97 write!(f, "EmptyExec")
98 }
99 DisplayFormatType::TreeRender => {
100 write!(f, "")
102 }
103 }
104 }
105}
106
107impl ExecutionPlan for EmptyExec {
108 fn name(&self) -> &'static str {
109 "EmptyExec"
110 }
111
112 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn properties(&self) -> &PlanProperties {
118 &self.cache
119 }
120
121 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
122 vec![]
123 }
124
125 fn with_new_children(
126 self: Arc<Self>,
127 _: Vec<Arc<dyn ExecutionPlan>>,
128 ) -> Result<Arc<dyn ExecutionPlan>> {
129 Ok(self)
130 }
131
132 fn execute(
133 &self,
134 partition: usize,
135 context: Arc<TaskContext>,
136 ) -> Result<SendableRecordBatchStream> {
137 trace!(
138 "Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}",
139 partition,
140 context.session_id(),
141 context.task_id()
142 );
143
144 assert_or_internal_err!(
145 partition < self.partitions,
146 "EmptyExec invalid partition {} (expected less than {})",
147 partition,
148 self.partitions
149 );
150
151 Ok(Box::pin(MemoryStream::try_new(
152 self.data()?,
153 Arc::clone(&self.schema),
154 None,
155 )?))
156 }
157
158 fn statistics(&self) -> Result<Statistics> {
159 self.partition_statistics(None)
160 }
161
162 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
163 if let Some(partition) = partition {
164 assert_or_internal_err!(
165 partition < self.partitions,
166 "EmptyExec invalid partition {} (expected less than {})",
167 partition,
168 self.partitions
169 );
170 }
171
172 let batch = self
173 .data()
174 .expect("Create empty RecordBatch should not fail");
175 Ok(common::compute_record_batch_statistics(
176 &[batch],
177 &self.schema,
178 None,
179 ))
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186 use crate::test;
187 use crate::with_new_children_if_necessary;
188
189 #[tokio::test]
190 async fn empty() -> Result<()> {
191 let task_ctx = Arc::new(TaskContext::default());
192 let schema = test::aggr_test_schema();
193
194 let empty = EmptyExec::new(Arc::clone(&schema));
195 assert_eq!(empty.schema(), schema);
196
197 let iter = empty.execute(0, task_ctx)?;
199 let batches = common::collect(iter).await?;
200 assert!(batches.is_empty());
201
202 Ok(())
203 }
204
205 #[test]
206 fn with_new_children() -> Result<()> {
207 let schema = test::aggr_test_schema();
208 let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
209
210 let empty2 = with_new_children_if_necessary(
211 Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
212 vec![],
213 )?;
214 assert_eq!(empty.schema(), empty2.schema());
215
216 let too_many_kids = vec![empty2];
217 assert!(
218 with_new_children_if_necessary(empty, too_many_kids).is_err(),
219 "expected error when providing list of kids"
220 );
221 Ok(())
222 }
223
224 #[tokio::test]
225 async fn invalid_execute() -> Result<()> {
226 let task_ctx = Arc::new(TaskContext::default());
227 let schema = test::aggr_test_schema();
228 let empty = EmptyExec::new(schema);
229
230 assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
232 assert!(empty.execute(20, task_ctx).is_err());
233 Ok(())
234 }
235}