datafusion_physical_plan/
empty.rs1use std::sync::Arc;
21
22use crate::memory::MemoryStream;
23use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
24use crate::{
25 DisplayFormatType, ExecutionPlan, Partitioning,
26 execution_plan::{Boundedness, EmissionType},
27};
28
29use arrow::datatypes::SchemaRef;
30use arrow::record_batch::RecordBatch;
31use datafusion_common::stats::Precision;
32use datafusion_common::{ColumnStatistics, Result, ScalarValue, assert_or_internal_err};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use crate::execution_plan::SchedulingType;
37use log::trace;
38
39#[derive(Debug, Clone)]
41pub struct EmptyExec {
42 schema: SchemaRef,
44 partitions: usize,
46 cache: Arc<PlanProperties>,
47}
48
49impl EmptyExec {
50 pub fn new(schema: SchemaRef) -> Self {
52 let cache = Self::compute_properties(Arc::clone(&schema), 1);
53 EmptyExec {
54 schema,
55 partitions: 1,
56 cache: Arc::new(cache),
57 }
58 }
59
60 pub fn with_partitions(mut self, partitions: usize) -> Self {
62 self.partitions = partitions;
63 let output_partitioning = Self::output_partitioning_helper(self.partitions);
65 Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
66 self
67 }
68
69 fn data(&self) -> Result<Vec<RecordBatch>> {
70 Ok(vec![])
71 }
72
73 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
74 Partitioning::UnknownPartitioning(n_partitions)
75 }
76
77 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
79 PlanProperties::new(
80 EquivalenceProperties::new(schema),
81 Self::output_partitioning_helper(n_partitions),
82 EmissionType::Incremental,
83 Boundedness::Bounded,
84 )
85 .with_scheduling_type(SchedulingType::Cooperative)
86 }
87}
88
89impl DisplayAs for EmptyExec {
90 fn fmt_as(
91 &self,
92 t: DisplayFormatType,
93 f: &mut std::fmt::Formatter,
94 ) -> std::fmt::Result {
95 match t {
96 DisplayFormatType::Default | DisplayFormatType::Verbose => {
97 write!(f, "EmptyExec")
98 }
99 DisplayFormatType::TreeRender => {
100 write!(f, "")
102 }
103 }
104 }
105}
106
107impl ExecutionPlan for EmptyExec {
108 fn name(&self) -> &'static str {
109 "EmptyExec"
110 }
111
112 fn properties(&self) -> &Arc<PlanProperties> {
114 &self.cache
115 }
116
117 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
118 vec![]
119 }
120
121 fn with_new_children(
122 self: Arc<Self>,
123 _: Vec<Arc<dyn ExecutionPlan>>,
124 ) -> Result<Arc<dyn ExecutionPlan>> {
125 Ok(self)
126 }
127
128 fn execute(
129 &self,
130 partition: usize,
131 context: Arc<TaskContext>,
132 ) -> Result<SendableRecordBatchStream> {
133 trace!(
134 "Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}",
135 partition,
136 context.session_id(),
137 context.task_id()
138 );
139
140 assert_or_internal_err!(
141 partition < self.partitions,
142 "EmptyExec invalid partition {} (expected less than {})",
143 partition,
144 self.partitions
145 );
146
147 Ok(Box::pin(MemoryStream::try_new(
148 self.data()?,
149 Arc::clone(&self.schema),
150 None,
151 )?))
152 }
153
154 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
155 if let Some(partition) = partition {
156 assert_or_internal_err!(
157 partition < self.partitions,
158 "EmptyExec invalid partition {} (expected less than {})",
159 partition,
160 self.partitions
161 );
162 }
163
164 let mut stats = Statistics::default()
166 .with_num_rows(Precision::Exact(0))
167 .with_total_byte_size(Precision::Exact(0));
168
169 for _ in self.schema.fields() {
171 stats = stats.add_column_statistics(ColumnStatistics {
172 null_count: Precision::Exact(0),
173 distinct_count: Precision::Exact(0),
174 min_value: Precision::<ScalarValue>::Absent,
175 max_value: Precision::<ScalarValue>::Absent,
176 sum_value: Precision::<ScalarValue>::Absent,
177 byte_size: Precision::Exact(0),
178 });
179 }
180
181 Ok(Arc::new(stats))
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use crate::common;
189 use crate::test;
190 use crate::with_new_children_if_necessary;
191
192 #[tokio::test]
193 async fn empty() -> Result<()> {
194 let task_ctx = Arc::new(TaskContext::default());
195 let schema = test::aggr_test_schema();
196
197 let empty = EmptyExec::new(Arc::clone(&schema));
198 assert_eq!(empty.schema(), schema);
199
200 let iter = empty.execute(0, task_ctx)?;
202 let batches = common::collect(iter).await?;
203 assert!(batches.is_empty());
204
205 Ok(())
206 }
207
208 #[test]
209 fn with_new_children() -> Result<()> {
210 let schema = test::aggr_test_schema();
211 let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
212
213 let empty2 = with_new_children_if_necessary(
214 Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
215 vec![],
216 )?;
217 assert_eq!(empty.schema(), empty2.schema());
218
219 let too_many_kids = vec![empty2];
220 assert!(
221 with_new_children_if_necessary(empty, too_many_kids).is_err(),
222 "expected error when providing list of kids"
223 );
224 Ok(())
225 }
226
227 #[tokio::test]
228 async fn invalid_execute() -> Result<()> {
229 let task_ctx = Arc::new(TaskContext::default());
230 let schema = test::aggr_test_schema();
231 let empty = EmptyExec::new(schema);
232
233 assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
235 assert!(empty.execute(20, task_ctx).is_err());
236 Ok(())
237 }
238}