datafusion_physical_plan/
empty.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
25use crate::{
26 DisplayFormatType, ExecutionPlan, Partitioning,
27 execution_plan::{Boundedness, EmissionType},
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::stats::Precision;
33use datafusion_common::{ColumnStatistics, Result, ScalarValue, assert_or_internal_err};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use crate::execution_plan::SchedulingType;
38use log::trace;
39
40#[derive(Debug, Clone)]
42pub struct EmptyExec {
43 schema: SchemaRef,
45 partitions: usize,
47 cache: Arc<PlanProperties>,
48}
49
50impl EmptyExec {
51 pub fn new(schema: SchemaRef) -> Self {
53 let cache = Self::compute_properties(Arc::clone(&schema), 1);
54 EmptyExec {
55 schema,
56 partitions: 1,
57 cache: Arc::new(cache),
58 }
59 }
60
61 pub fn with_partitions(mut self, partitions: usize) -> Self {
63 self.partitions = partitions;
64 let output_partitioning = Self::output_partitioning_helper(self.partitions);
66 Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
67 self
68 }
69
70 fn data(&self) -> Result<Vec<RecordBatch>> {
71 Ok(vec![])
72 }
73
74 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
75 Partitioning::UnknownPartitioning(n_partitions)
76 }
77
78 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
80 PlanProperties::new(
81 EquivalenceProperties::new(schema),
82 Self::output_partitioning_helper(n_partitions),
83 EmissionType::Incremental,
84 Boundedness::Bounded,
85 )
86 .with_scheduling_type(SchedulingType::Cooperative)
87 }
88}
89
90impl DisplayAs for EmptyExec {
91 fn fmt_as(
92 &self,
93 t: DisplayFormatType,
94 f: &mut std::fmt::Formatter,
95 ) -> std::fmt::Result {
96 match t {
97 DisplayFormatType::Default | DisplayFormatType::Verbose => {
98 write!(f, "EmptyExec")
99 }
100 DisplayFormatType::TreeRender => {
101 write!(f, "")
103 }
104 }
105 }
106}
107
108impl ExecutionPlan for EmptyExec {
109 fn name(&self) -> &'static str {
110 "EmptyExec"
111 }
112
113 fn as_any(&self) -> &dyn Any {
115 self
116 }
117
118 fn properties(&self) -> &Arc<PlanProperties> {
119 &self.cache
120 }
121
122 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
123 vec![]
124 }
125
126 fn with_new_children(
127 self: Arc<Self>,
128 _: Vec<Arc<dyn ExecutionPlan>>,
129 ) -> Result<Arc<dyn ExecutionPlan>> {
130 Ok(self)
131 }
132
133 fn execute(
134 &self,
135 partition: usize,
136 context: Arc<TaskContext>,
137 ) -> Result<SendableRecordBatchStream> {
138 trace!(
139 "Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}",
140 partition,
141 context.session_id(),
142 context.task_id()
143 );
144
145 assert_or_internal_err!(
146 partition < self.partitions,
147 "EmptyExec invalid partition {} (expected less than {})",
148 partition,
149 self.partitions
150 );
151
152 Ok(Box::pin(MemoryStream::try_new(
153 self.data()?,
154 Arc::clone(&self.schema),
155 None,
156 )?))
157 }
158
159 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
160 if let Some(partition) = partition {
161 assert_or_internal_err!(
162 partition < self.partitions,
163 "EmptyExec invalid partition {} (expected less than {})",
164 partition,
165 self.partitions
166 );
167 }
168
169 let mut stats = Statistics::default()
171 .with_num_rows(Precision::Exact(0))
172 .with_total_byte_size(Precision::Exact(0));
173
174 for _ in self.schema.fields() {
176 stats = stats.add_column_statistics(ColumnStatistics {
177 null_count: Precision::Exact(0),
178 distinct_count: Precision::Exact(0),
179 min_value: Precision::<ScalarValue>::Absent,
180 max_value: Precision::<ScalarValue>::Absent,
181 sum_value: Precision::<ScalarValue>::Absent,
182 byte_size: Precision::Exact(0),
183 });
184 }
185
186 Ok(stats)
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use crate::common;
194 use crate::test;
195 use crate::with_new_children_if_necessary;
196
197 #[tokio::test]
198 async fn empty() -> Result<()> {
199 let task_ctx = Arc::new(TaskContext::default());
200 let schema = test::aggr_test_schema();
201
202 let empty = EmptyExec::new(Arc::clone(&schema));
203 assert_eq!(empty.schema(), schema);
204
205 let iter = empty.execute(0, task_ctx)?;
207 let batches = common::collect(iter).await?;
208 assert!(batches.is_empty());
209
210 Ok(())
211 }
212
213 #[test]
214 fn with_new_children() -> Result<()> {
215 let schema = test::aggr_test_schema();
216 let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
217
218 let empty2 = with_new_children_if_necessary(
219 Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
220 vec![],
221 )?;
222 assert_eq!(empty.schema(), empty2.schema());
223
224 let too_many_kids = vec![empty2];
225 assert!(
226 with_new_children_if_necessary(empty, too_many_kids).is_err(),
227 "expected error when providing list of kids"
228 );
229 Ok(())
230 }
231
232 #[tokio::test]
233 async fn invalid_execute() -> Result<()> {
234 let task_ctx = Arc::new(TaskContext::default());
235 let schema = test::aggr_test_schema();
236 let empty = EmptyExec::new(schema);
237
238 assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
240 assert!(empty.execute(20, task_ctx).is_err());
241 Ok(())
242 }
243}