datafusion_physical_plan/
placeholder_row.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::coop::cooperative;
24use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
25use crate::memory::MemoryStream;
26use crate::{
27 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28 SendableRecordBatchStream, Statistics, common,
29};
30
31use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions};
32use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
33use datafusion_common::{Result, assert_or_internal_err};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use log::trace;
38
39#[derive(Debug, Clone)]
41pub struct PlaceholderRowExec {
42 schema: SchemaRef,
44 partitions: usize,
46 cache: PlanProperties,
47}
48
49impl PlaceholderRowExec {
50 pub fn new(schema: SchemaRef) -> Self {
52 let partitions = 1;
53 let cache = Self::compute_properties(Arc::clone(&schema), partitions);
54 PlaceholderRowExec {
55 schema,
56 partitions,
57 cache,
58 }
59 }
60
61 pub fn with_partitions(mut self, partitions: usize) -> Self {
63 self.partitions = partitions;
64 let output_partitioning = Self::output_partitioning_helper(self.partitions);
66 self.cache = self.cache.with_partitioning(output_partitioning);
67 self
68 }
69
70 fn data(&self) -> Result<Vec<RecordBatch>> {
71 Ok({
72 let n_field = self.schema.fields.len();
73 vec![RecordBatch::try_new_with_options(
74 Arc::new(Schema::new(
75 (0..n_field)
76 .map(|i| {
77 Field::new(format!("placeholder_{i}"), DataType::Null, true)
78 })
79 .collect::<Fields>(),
80 )),
81 (0..n_field)
82 .map(|_i| {
83 let ret: ArrayRef = Arc::new(NullArray::new(1));
84 ret
85 })
86 .collect(),
87 &RecordBatchOptions::new().with_row_count(Some(1)),
89 )?]
90 })
91 }
92
93 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
94 Partitioning::UnknownPartitioning(n_partitions)
95 }
96
97 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
99 PlanProperties::new(
100 EquivalenceProperties::new(schema),
101 Self::output_partitioning_helper(n_partitions),
102 EmissionType::Incremental,
103 Boundedness::Bounded,
104 )
105 .with_scheduling_type(SchedulingType::Cooperative)
106 }
107}
108
109impl DisplayAs for PlaceholderRowExec {
110 fn fmt_as(
111 &self,
112 t: DisplayFormatType,
113 f: &mut std::fmt::Formatter,
114 ) -> std::fmt::Result {
115 match t {
116 DisplayFormatType::Default | DisplayFormatType::Verbose => {
117 write!(f, "PlaceholderRowExec")
118 }
119
120 DisplayFormatType::TreeRender => Ok(()),
121 }
122 }
123}
124
125impl ExecutionPlan for PlaceholderRowExec {
126 fn name(&self) -> &'static str {
127 "PlaceholderRowExec"
128 }
129
130 fn as_any(&self) -> &dyn Any {
132 self
133 }
134
135 fn properties(&self) -> &PlanProperties {
136 &self.cache
137 }
138
139 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
140 vec![]
141 }
142
143 fn with_new_children(
144 self: Arc<Self>,
145 _: Vec<Arc<dyn ExecutionPlan>>,
146 ) -> Result<Arc<dyn ExecutionPlan>> {
147 Ok(self)
148 }
149
150 fn execute(
151 &self,
152 partition: usize,
153 context: Arc<TaskContext>,
154 ) -> Result<SendableRecordBatchStream> {
155 trace!(
156 "Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}",
157 partition,
158 context.session_id(),
159 context.task_id()
160 );
161
162 assert_or_internal_err!(
163 partition < self.partitions,
164 "PlaceholderRowExec invalid partition {partition} (expected less than {})",
165 self.partitions
166 );
167
168 let ms = MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None)?;
169 Ok(Box::pin(cooperative(ms)))
170 }
171
172 fn statistics(&self) -> Result<Statistics> {
173 self.partition_statistics(None)
174 }
175
176 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
177 let batches = self
178 .data()
179 .expect("Create single row placeholder RecordBatch should not fail");
180
181 let batches = match partition {
182 Some(_) => vec![batches],
183 None => vec![batches; self.partitions],
185 };
186
187 Ok(common::compute_record_batch_statistics(
188 &batches,
189 &self.schema,
190 None,
191 ))
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use crate::test;
199 use crate::with_new_children_if_necessary;
200
201 #[test]
202 fn with_new_children() -> Result<()> {
203 let schema = test::aggr_test_schema();
204
205 let placeholder = Arc::new(PlaceholderRowExec::new(schema));
206
207 let placeholder_2 = with_new_children_if_necessary(
208 Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
209 vec![],
210 )?;
211 assert_eq!(placeholder.schema(), placeholder_2.schema());
212
213 let too_many_kids = vec![placeholder_2];
214 assert!(
215 with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
216 "expected error when providing list of kids"
217 );
218 Ok(())
219 }
220
221 #[tokio::test]
222 async fn invalid_execute() -> Result<()> {
223 let task_ctx = Arc::new(TaskContext::default());
224 let schema = test::aggr_test_schema();
225 let placeholder = PlaceholderRowExec::new(schema);
226
227 assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
229 assert!(placeholder.execute(20, task_ctx).is_err());
230 Ok(())
231 }
232
233 #[tokio::test]
234 async fn produce_one_row() -> Result<()> {
235 let task_ctx = Arc::new(TaskContext::default());
236 let schema = test::aggr_test_schema();
237 let placeholder = PlaceholderRowExec::new(schema);
238
239 let iter = placeholder.execute(0, task_ctx)?;
240 let batches = common::collect(iter).await?;
241
242 assert_eq!(batches.len(), 1);
244
245 Ok(())
246 }
247
248 #[tokio::test]
249 async fn produce_one_row_multiple_partition() -> Result<()> {
250 let task_ctx = Arc::new(TaskContext::default());
251 let schema = test::aggr_test_schema();
252 let partitions = 3;
253 let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
254
255 for n in 0..partitions {
256 let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
257 let batches = common::collect(iter).await?;
258
259 assert_eq!(batches.len(), 1);
261 }
262
263 Ok(())
264 }
265}