datafusion_physical_plan/
placeholder_row.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::coop::cooperative;
24use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
25use crate::memory::MemoryStream;
26use crate::{
27 common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28 SendableRecordBatchStream, Statistics,
29};
30
31use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions};
32use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
33use datafusion_common::{internal_err, Result};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use log::trace;
38
39#[derive(Debug, Clone)]
41pub struct PlaceholderRowExec {
42 schema: SchemaRef,
44 partitions: usize,
46 cache: PlanProperties,
47}
48
49impl PlaceholderRowExec {
50 pub fn new(schema: SchemaRef) -> Self {
52 let partitions = 1;
53 let cache = Self::compute_properties(Arc::clone(&schema), partitions);
54 PlaceholderRowExec {
55 schema,
56 partitions,
57 cache,
58 }
59 }
60
61 pub fn with_partitions(mut self, partitions: usize) -> Self {
63 self.partitions = partitions;
64 let output_partitioning = Self::output_partitioning_helper(self.partitions);
66 self.cache = self.cache.with_partitioning(output_partitioning);
67 self
68 }
69
70 fn data(&self) -> Result<Vec<RecordBatch>> {
71 Ok({
72 let n_field = self.schema.fields.len();
73 vec![RecordBatch::try_new_with_options(
74 Arc::new(Schema::new(
75 (0..n_field)
76 .map(|i| {
77 Field::new(format!("placeholder_{i}"), DataType::Null, true)
78 })
79 .collect::<Fields>(),
80 )),
81 (0..n_field)
82 .map(|_i| {
83 let ret: ArrayRef = Arc::new(NullArray::new(1));
84 ret
85 })
86 .collect(),
87 &RecordBatchOptions::new().with_row_count(Some(1)),
89 )?]
90 })
91 }
92
93 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
94 Partitioning::UnknownPartitioning(n_partitions)
95 }
96
97 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
99 PlanProperties::new(
100 EquivalenceProperties::new(schema),
101 Self::output_partitioning_helper(n_partitions),
102 EmissionType::Incremental,
103 Boundedness::Bounded,
104 )
105 .with_scheduling_type(SchedulingType::Cooperative)
106 }
107}
108
109impl DisplayAs for PlaceholderRowExec {
110 fn fmt_as(
111 &self,
112 t: DisplayFormatType,
113 f: &mut std::fmt::Formatter,
114 ) -> std::fmt::Result {
115 match t {
116 DisplayFormatType::Default | DisplayFormatType::Verbose => {
117 write!(f, "PlaceholderRowExec")
118 }
119
120 DisplayFormatType::TreeRender => Ok(()),
121 }
122 }
123}
124
125impl ExecutionPlan for PlaceholderRowExec {
126 fn name(&self) -> &'static str {
127 "PlaceholderRowExec"
128 }
129
130 fn as_any(&self) -> &dyn Any {
132 self
133 }
134
135 fn properties(&self) -> &PlanProperties {
136 &self.cache
137 }
138
139 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
140 vec![]
141 }
142
143 fn with_new_children(
144 self: Arc<Self>,
145 _: Vec<Arc<dyn ExecutionPlan>>,
146 ) -> Result<Arc<dyn ExecutionPlan>> {
147 Ok(self)
148 }
149
150 fn execute(
151 &self,
152 partition: usize,
153 context: Arc<TaskContext>,
154 ) -> Result<SendableRecordBatchStream> {
155 trace!("Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
156
157 if partition >= self.partitions {
158 return internal_err!(
159 "PlaceholderRowExec invalid partition {} (expected less than {})",
160 partition,
161 self.partitions
162 );
163 }
164
165 let ms = MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None)?;
166 Ok(Box::pin(cooperative(ms)))
167 }
168
169 fn statistics(&self) -> Result<Statistics> {
170 self.partition_statistics(None)
171 }
172
173 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
174 let batches = self
175 .data()
176 .expect("Create single row placeholder RecordBatch should not fail");
177
178 let batches = match partition {
179 Some(_) => vec![batches],
180 None => vec![batches; self.partitions],
182 };
183
184 Ok(common::compute_record_batch_statistics(
185 &batches,
186 &self.schema,
187 None,
188 ))
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::test;
196 use crate::with_new_children_if_necessary;
197
198 #[test]
199 fn with_new_children() -> Result<()> {
200 let schema = test::aggr_test_schema();
201
202 let placeholder = Arc::new(PlaceholderRowExec::new(schema));
203
204 let placeholder_2 = with_new_children_if_necessary(
205 Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
206 vec![],
207 )?;
208 assert_eq!(placeholder.schema(), placeholder_2.schema());
209
210 let too_many_kids = vec![placeholder_2];
211 assert!(
212 with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
213 "expected error when providing list of kids"
214 );
215 Ok(())
216 }
217
218 #[tokio::test]
219 async fn invalid_execute() -> Result<()> {
220 let task_ctx = Arc::new(TaskContext::default());
221 let schema = test::aggr_test_schema();
222 let placeholder = PlaceholderRowExec::new(schema);
223
224 assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
226 assert!(placeholder.execute(20, task_ctx).is_err());
227 Ok(())
228 }
229
230 #[tokio::test]
231 async fn produce_one_row() -> Result<()> {
232 let task_ctx = Arc::new(TaskContext::default());
233 let schema = test::aggr_test_schema();
234 let placeholder = PlaceholderRowExec::new(schema);
235
236 let iter = placeholder.execute(0, task_ctx)?;
237 let batches = common::collect(iter).await?;
238
239 assert_eq!(batches.len(), 1);
241
242 Ok(())
243 }
244
245 #[tokio::test]
246 async fn produce_one_row_multiple_partition() -> Result<()> {
247 let task_ctx = Arc::new(TaskContext::default());
248 let schema = test::aggr_test_schema();
249 let partitions = 3;
250 let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
251
252 for n in 0..partitions {
253 let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
254 let batches = common::collect(iter).await?;
255
256 assert_eq!(batches.len(), 1);
258 }
259
260 Ok(())
261 }
262}