datafusion_physical_plan/
placeholder_row.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::coop::cooperative;
24use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
25use crate::memory::MemoryStream;
26use crate::{
27 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28 SendableRecordBatchStream, Statistics, common,
29};
30
31use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions};
32use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
33use datafusion_common::{Result, assert_or_internal_err};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use log::trace;
38
39#[derive(Debug, Clone)]
41pub struct PlaceholderRowExec {
42 schema: SchemaRef,
44 partitions: usize,
46 cache: Arc<PlanProperties>,
47}
48
49impl PlaceholderRowExec {
50 pub fn new(schema: SchemaRef) -> Self {
52 let partitions = 1;
53 let cache = Self::compute_properties(Arc::clone(&schema), partitions);
54 PlaceholderRowExec {
55 schema,
56 partitions,
57 cache: Arc::new(cache),
58 }
59 }
60
61 pub fn with_partitions(mut self, partitions: usize) -> Self {
63 self.partitions = partitions;
64 let output_partitioning = Self::output_partitioning_helper(self.partitions);
66 Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
67 self
68 }
69
70 fn data(&self) -> Result<Vec<RecordBatch>> {
71 Ok({
72 let n_field = self.schema.fields.len();
73 vec![RecordBatch::try_new_with_options(
74 Arc::new(Schema::new(
75 (0..n_field)
76 .map(|i| {
77 Field::new(format!("placeholder_{i}"), DataType::Null, true)
78 })
79 .collect::<Fields>(),
80 )),
81 (0..n_field)
82 .map(|_i| {
83 let ret: ArrayRef = Arc::new(NullArray::new(1));
84 ret
85 })
86 .collect(),
87 &RecordBatchOptions::new().with_row_count(Some(1)),
89 )?]
90 })
91 }
92
93 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
94 Partitioning::UnknownPartitioning(n_partitions)
95 }
96
97 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
99 PlanProperties::new(
100 EquivalenceProperties::new(schema),
101 Self::output_partitioning_helper(n_partitions),
102 EmissionType::Incremental,
103 Boundedness::Bounded,
104 )
105 .with_scheduling_type(SchedulingType::Cooperative)
106 }
107}
108
109impl DisplayAs for PlaceholderRowExec {
110 fn fmt_as(
111 &self,
112 t: DisplayFormatType,
113 f: &mut std::fmt::Formatter,
114 ) -> std::fmt::Result {
115 match t {
116 DisplayFormatType::Default | DisplayFormatType::Verbose => {
117 write!(f, "PlaceholderRowExec")
118 }
119
120 DisplayFormatType::TreeRender => Ok(()),
121 }
122 }
123}
124
125impl ExecutionPlan for PlaceholderRowExec {
126 fn name(&self) -> &'static str {
127 "PlaceholderRowExec"
128 }
129
130 fn as_any(&self) -> &dyn Any {
132 self
133 }
134
135 fn properties(&self) -> &Arc<PlanProperties> {
136 &self.cache
137 }
138
139 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
140 vec![]
141 }
142
143 fn with_new_children(
144 self: Arc<Self>,
145 _: Vec<Arc<dyn ExecutionPlan>>,
146 ) -> Result<Arc<dyn ExecutionPlan>> {
147 Ok(self)
148 }
149
150 fn execute(
151 &self,
152 partition: usize,
153 context: Arc<TaskContext>,
154 ) -> Result<SendableRecordBatchStream> {
155 trace!(
156 "Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}",
157 partition,
158 context.session_id(),
159 context.task_id()
160 );
161
162 assert_or_internal_err!(
163 partition < self.partitions,
164 "PlaceholderRowExec invalid partition {partition} (expected less than {})",
165 self.partitions
166 );
167
168 let ms = MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None)?;
169 Ok(Box::pin(cooperative(ms)))
170 }
171
172 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
173 let batches = self
174 .data()
175 .expect("Create single row placeholder RecordBatch should not fail");
176
177 let batches = match partition {
178 Some(_) => vec![batches],
179 None => vec![batches; self.partitions],
181 };
182
183 Ok(common::compute_record_batch_statistics(
184 &batches,
185 &self.schema,
186 None,
187 ))
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::test;
195 use crate::with_new_children_if_necessary;
196
197 #[test]
198 fn with_new_children() -> Result<()> {
199 let schema = test::aggr_test_schema();
200
201 let placeholder = Arc::new(PlaceholderRowExec::new(schema));
202
203 let placeholder_2 = with_new_children_if_necessary(
204 Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
205 vec![],
206 )?;
207 assert_eq!(placeholder.schema(), placeholder_2.schema());
208
209 let too_many_kids = vec![placeholder_2];
210 assert!(
211 with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
212 "expected error when providing list of kids"
213 );
214 Ok(())
215 }
216
217 #[tokio::test]
218 async fn invalid_execute() -> Result<()> {
219 let task_ctx = Arc::new(TaskContext::default());
220 let schema = test::aggr_test_schema();
221 let placeholder = PlaceholderRowExec::new(schema);
222
223 assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
225 assert!(placeholder.execute(20, task_ctx).is_err());
226 Ok(())
227 }
228
229 #[tokio::test]
230 async fn produce_one_row() -> Result<()> {
231 let task_ctx = Arc::new(TaskContext::default());
232 let schema = test::aggr_test_schema();
233 let placeholder = PlaceholderRowExec::new(schema);
234
235 let iter = placeholder.execute(0, task_ctx)?;
236 let batches = common::collect(iter).await?;
237
238 assert_eq!(batches.len(), 1);
240
241 Ok(())
242 }
243
244 #[tokio::test]
245 async fn produce_one_row_multiple_partition() -> Result<()> {
246 let task_ctx = Arc::new(TaskContext::default());
247 let schema = test::aggr_test_schema();
248 let partitions = 3;
249 let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
250
251 for n in 0..partitions {
252 let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
253 let batches = common::collect(iter).await?;
254
255 assert_eq!(batches.len(), 1);
257 }
258
259 Ok(())
260 }
261}