datafusion_physical_plan/
placeholder_row.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::memory::MemoryStream;
25use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
26use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
27use arrow::array::{ArrayRef, NullArray};
28use arrow::array::{RecordBatch, RecordBatchOptions};
29use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
30use datafusion_common::{internal_err, Result};
31use datafusion_execution::TaskContext;
32use datafusion_physical_expr::EquivalenceProperties;
33
34use log::trace;
35
36#[derive(Debug, Clone)]
38pub struct PlaceholderRowExec {
39 schema: SchemaRef,
41 partitions: usize,
43 cache: PlanProperties,
44}
45
46impl PlaceholderRowExec {
47 pub fn new(schema: SchemaRef) -> Self {
49 let partitions = 1;
50 let cache = Self::compute_properties(Arc::clone(&schema), partitions);
51 PlaceholderRowExec {
52 schema,
53 partitions,
54 cache,
55 }
56 }
57
58 pub fn with_partitions(mut self, partitions: usize) -> Self {
60 self.partitions = partitions;
61 let output_partitioning = Self::output_partitioning_helper(self.partitions);
63 self.cache = self.cache.with_partitioning(output_partitioning);
64 self
65 }
66
67 fn data(&self) -> Result<Vec<RecordBatch>> {
68 Ok({
69 let n_field = self.schema.fields.len();
70 vec![RecordBatch::try_new_with_options(
71 Arc::new(Schema::new(
72 (0..n_field)
73 .map(|i| {
74 Field::new(format!("placeholder_{i}"), DataType::Null, true)
75 })
76 .collect::<Fields>(),
77 )),
78 (0..n_field)
79 .map(|_i| {
80 let ret: ArrayRef = Arc::new(NullArray::new(1));
81 ret
82 })
83 .collect(),
84 &RecordBatchOptions::new().with_row_count(Some(1)),
86 )?]
87 })
88 }
89
90 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
91 Partitioning::UnknownPartitioning(n_partitions)
92 }
93
94 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
96 PlanProperties::new(
97 EquivalenceProperties::new(schema),
98 Self::output_partitioning_helper(n_partitions),
99 EmissionType::Incremental,
100 Boundedness::Bounded,
101 )
102 }
103}
104
105impl DisplayAs for PlaceholderRowExec {
106 fn fmt_as(
107 &self,
108 t: DisplayFormatType,
109 f: &mut std::fmt::Formatter,
110 ) -> std::fmt::Result {
111 match t {
112 DisplayFormatType::Default | DisplayFormatType::Verbose => {
113 write!(f, "PlaceholderRowExec")
114 }
115
116 DisplayFormatType::TreeRender => Ok(()),
117 }
118 }
119}
120
121impl ExecutionPlan for PlaceholderRowExec {
122 fn name(&self) -> &'static str {
123 "PlaceholderRowExec"
124 }
125
126 fn as_any(&self) -> &dyn Any {
128 self
129 }
130
131 fn properties(&self) -> &PlanProperties {
132 &self.cache
133 }
134
135 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
136 vec![]
137 }
138
139 fn with_new_children(
140 self: Arc<Self>,
141 _: Vec<Arc<dyn ExecutionPlan>>,
142 ) -> Result<Arc<dyn ExecutionPlan>> {
143 Ok(self)
144 }
145
146 fn execute(
147 &self,
148 partition: usize,
149 context: Arc<TaskContext>,
150 ) -> Result<SendableRecordBatchStream> {
151 trace!("Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
152
153 if partition >= self.partitions {
154 return internal_err!(
155 "PlaceholderRowExec invalid partition {} (expected less than {})",
156 partition,
157 self.partitions
158 );
159 }
160
161 Ok(Box::pin(MemoryStream::try_new(
162 self.data()?,
163 Arc::clone(&self.schema),
164 None,
165 )?))
166 }
167
168 fn statistics(&self) -> Result<Statistics> {
169 self.partition_statistics(None)
170 }
171
172 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
173 if partition.is_some() {
174 return Ok(Statistics::new_unknown(&self.schema()));
175 }
176 let batch = self
177 .data()
178 .expect("Create single row placeholder RecordBatch should not fail");
179 Ok(common::compute_record_batch_statistics(
180 &[batch],
181 &self.schema,
182 None,
183 ))
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use crate::test;
191 use crate::with_new_children_if_necessary;
192
193 #[test]
194 fn with_new_children() -> Result<()> {
195 let schema = test::aggr_test_schema();
196
197 let placeholder = Arc::new(PlaceholderRowExec::new(schema));
198
199 let placeholder_2 = with_new_children_if_necessary(
200 Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
201 vec![],
202 )?;
203 assert_eq!(placeholder.schema(), placeholder_2.schema());
204
205 let too_many_kids = vec![placeholder_2];
206 assert!(
207 with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
208 "expected error when providing list of kids"
209 );
210 Ok(())
211 }
212
213 #[tokio::test]
214 async fn invalid_execute() -> Result<()> {
215 let task_ctx = Arc::new(TaskContext::default());
216 let schema = test::aggr_test_schema();
217 let placeholder = PlaceholderRowExec::new(schema);
218
219 assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
221 assert!(placeholder.execute(20, task_ctx).is_err());
222 Ok(())
223 }
224
225 #[tokio::test]
226 async fn produce_one_row() -> Result<()> {
227 let task_ctx = Arc::new(TaskContext::default());
228 let schema = test::aggr_test_schema();
229 let placeholder = PlaceholderRowExec::new(schema);
230
231 let iter = placeholder.execute(0, task_ctx)?;
232 let batches = common::collect(iter).await?;
233
234 assert_eq!(batches.len(), 1);
236
237 Ok(())
238 }
239
240 #[tokio::test]
241 async fn produce_one_row_multiple_partition() -> Result<()> {
242 let task_ctx = Arc::new(TaskContext::default());
243 let schema = test::aggr_test_schema();
244 let partitions = 3;
245 let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
246
247 for n in 0..partitions {
248 let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
249 let batches = common::collect(iter).await?;
250
251 assert_eq!(batches.len(), 1);
253 }
254
255 Ok(())
256 }
257}