datafusion_physical_plan/
placeholder_row.rs1use std::sync::Arc;
21
22use crate::coop::cooperative;
23use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
24use crate::memory::MemoryStream;
25use crate::{
26 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
27 SendableRecordBatchStream, Statistics, common,
28};
29
30use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions};
31use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
32use datafusion_common::{Result, assert_or_internal_err};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use log::trace;
37
38#[derive(Debug, Clone)]
40pub struct PlaceholderRowExec {
41 schema: SchemaRef,
43 partitions: usize,
45 cache: Arc<PlanProperties>,
46}
47
48impl PlaceholderRowExec {
49 pub fn new(schema: SchemaRef) -> Self {
51 let partitions = 1;
52 let cache = Self::compute_properties(Arc::clone(&schema), partitions);
53 PlaceholderRowExec {
54 schema,
55 partitions,
56 cache: Arc::new(cache),
57 }
58 }
59
60 pub fn with_partitions(mut self, partitions: usize) -> Self {
62 self.partitions = partitions;
63 let output_partitioning = Self::output_partitioning_helper(self.partitions);
65 Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
66 self
67 }
68
69 fn data(&self) -> Result<Vec<RecordBatch>> {
70 Ok({
71 let n_field = self.schema.fields.len();
72 vec![RecordBatch::try_new_with_options(
73 Arc::new(Schema::new(
74 (0..n_field)
75 .map(|i| {
76 Field::new(format!("placeholder_{i}"), DataType::Null, true)
77 })
78 .collect::<Fields>(),
79 )),
80 (0..n_field)
81 .map(|_i| {
82 let ret: ArrayRef = Arc::new(NullArray::new(1));
83 ret
84 })
85 .collect(),
86 &RecordBatchOptions::new().with_row_count(Some(1)),
88 )?]
89 })
90 }
91
92 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
93 Partitioning::UnknownPartitioning(n_partitions)
94 }
95
96 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
98 PlanProperties::new(
99 EquivalenceProperties::new(schema),
100 Self::output_partitioning_helper(n_partitions),
101 EmissionType::Incremental,
102 Boundedness::Bounded,
103 )
104 .with_scheduling_type(SchedulingType::Cooperative)
105 }
106}
107
108impl DisplayAs for PlaceholderRowExec {
109 fn fmt_as(
110 &self,
111 t: DisplayFormatType,
112 f: &mut std::fmt::Formatter,
113 ) -> std::fmt::Result {
114 match t {
115 DisplayFormatType::Default | DisplayFormatType::Verbose => {
116 write!(f, "PlaceholderRowExec")
117 }
118
119 DisplayFormatType::TreeRender => Ok(()),
120 }
121 }
122}
123
124impl ExecutionPlan for PlaceholderRowExec {
125 fn name(&self) -> &'static str {
126 "PlaceholderRowExec"
127 }
128
129 fn properties(&self) -> &Arc<PlanProperties> {
131 &self.cache
132 }
133
134 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
135 vec![]
136 }
137
138 fn with_new_children(
139 self: Arc<Self>,
140 _: Vec<Arc<dyn ExecutionPlan>>,
141 ) -> Result<Arc<dyn ExecutionPlan>> {
142 Ok(self)
143 }
144
145 fn execute(
146 &self,
147 partition: usize,
148 context: Arc<TaskContext>,
149 ) -> Result<SendableRecordBatchStream> {
150 trace!(
151 "Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}",
152 partition,
153 context.session_id(),
154 context.task_id()
155 );
156
157 assert_or_internal_err!(
158 partition < self.partitions,
159 "PlaceholderRowExec invalid partition {partition} (expected less than {})",
160 self.partitions
161 );
162
163 let ms = MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None)?;
164 Ok(Box::pin(cooperative(ms)))
165 }
166
167 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
168 let batches = self
169 .data()
170 .expect("Create single row placeholder RecordBatch should not fail");
171
172 let batches = match partition {
173 Some(_) => vec![batches],
174 None => vec![batches; self.partitions],
176 };
177
178 Ok(Arc::new(common::compute_record_batch_statistics(
179 &batches,
180 &self.schema,
181 None,
182 )))
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189 use crate::test;
190 use crate::with_new_children_if_necessary;
191
192 #[test]
193 fn with_new_children() -> Result<()> {
194 let schema = test::aggr_test_schema();
195
196 let placeholder = Arc::new(PlaceholderRowExec::new(schema));
197
198 let placeholder_2 = with_new_children_if_necessary(
199 Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
200 vec![],
201 )?;
202 assert_eq!(placeholder.schema(), placeholder_2.schema());
203
204 let too_many_kids = vec![placeholder_2];
205 assert!(
206 with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
207 "expected error when providing list of kids"
208 );
209 Ok(())
210 }
211
212 #[tokio::test]
213 async fn invalid_execute() -> Result<()> {
214 let task_ctx = Arc::new(TaskContext::default());
215 let schema = test::aggr_test_schema();
216 let placeholder = PlaceholderRowExec::new(schema);
217
218 assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
220 assert!(placeholder.execute(20, task_ctx).is_err());
221 Ok(())
222 }
223
224 #[tokio::test]
225 async fn produce_one_row() -> Result<()> {
226 let task_ctx = Arc::new(TaskContext::default());
227 let schema = test::aggr_test_schema();
228 let placeholder = PlaceholderRowExec::new(schema);
229
230 let iter = placeholder.execute(0, task_ctx)?;
231 let batches = common::collect(iter).await?;
232
233 assert_eq!(batches.len(), 1);
235
236 Ok(())
237 }
238
239 #[tokio::test]
240 async fn produce_one_row_multiple_partition() -> Result<()> {
241 let task_ctx = Arc::new(TaskContext::default());
242 let schema = test::aggr_test_schema();
243 let partitions = 3;
244 let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
245
246 for n in 0..partitions {
247 let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
248 let batches = common::collect(iter).await?;
249
250 assert_eq!(batches.len(), 1);
252 }
253
254 Ok(())
255 }
256}