datafusion_physical_plan/
streaming.rs1use std::any::Any;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use super::{DisplayAs, DisplayFormatType, PlanProperties};
25use crate::coop::make_cooperative;
26use crate::display::{display_orderings, ProjectSchemaDisplay};
27use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
28use crate::limit::LimitStream;
29use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
30use crate::projection::{
31 all_alias_free_columns, new_projections_for_columns, update_ordering, ProjectionExec,
32};
33use crate::stream::RecordBatchStreamAdapter;
34use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
35
36use arrow::datatypes::{Schema, SchemaRef};
37use datafusion_common::{internal_err, plan_err, Result};
38use datafusion_execution::TaskContext;
39use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
40
41use async_trait::async_trait;
42use futures::stream::StreamExt;
43use log::debug;
44
45pub trait PartitionStream: Debug + Send + Sync {
51 fn schema(&self) -> &SchemaRef;
53
54 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
56}
57
58#[derive(Clone)]
63pub struct StreamingTableExec {
64 partitions: Vec<Arc<dyn PartitionStream>>,
65 projection: Option<Arc<[usize]>>,
66 projected_schema: SchemaRef,
67 projected_output_ordering: Vec<LexOrdering>,
68 infinite: bool,
69 limit: Option<usize>,
70 cache: PlanProperties,
71 metrics: ExecutionPlanMetricsSet,
72}
73
74impl StreamingTableExec {
75 pub fn try_new(
77 schema: SchemaRef,
78 partitions: Vec<Arc<dyn PartitionStream>>,
79 projection: Option<&Vec<usize>>,
80 projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
81 infinite: bool,
82 limit: Option<usize>,
83 ) -> Result<Self> {
84 for x in partitions.iter() {
85 let partition_schema = x.schema();
86 if !schema.eq(partition_schema) {
87 debug!(
88 "Target schema does not match with partition schema. \
89 Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
90 );
91 return plan_err!("Mismatch between schema and batches");
92 }
93 }
94
95 let projected_schema = match projection {
96 Some(p) => Arc::new(schema.project(p)?),
97 None => schema,
98 };
99 let projected_output_ordering =
100 projected_output_ordering.into_iter().collect::<Vec<_>>();
101 let cache = Self::compute_properties(
102 Arc::clone(&projected_schema),
103 projected_output_ordering.clone(),
104 &partitions,
105 infinite,
106 );
107 Ok(Self {
108 partitions,
109 projected_schema,
110 projection: projection.cloned().map(Into::into),
111 projected_output_ordering,
112 infinite,
113 limit,
114 cache,
115 metrics: ExecutionPlanMetricsSet::new(),
116 })
117 }
118
119 pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
120 &self.partitions
121 }
122
123 pub fn partition_schema(&self) -> &SchemaRef {
124 self.partitions[0].schema()
125 }
126
127 pub fn projection(&self) -> &Option<Arc<[usize]>> {
128 &self.projection
129 }
130
131 pub fn projected_schema(&self) -> &Schema {
132 &self.projected_schema
133 }
134
135 pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
136 self.projected_output_ordering.clone()
137 }
138
139 pub fn is_infinite(&self) -> bool {
140 self.infinite
141 }
142
143 pub fn limit(&self) -> Option<usize> {
144 self.limit
145 }
146
147 fn compute_properties(
149 schema: SchemaRef,
150 orderings: Vec<LexOrdering>,
151 partitions: &[Arc<dyn PartitionStream>],
152 infinite: bool,
153 ) -> PlanProperties {
154 let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
156
157 let output_partitioning = Partitioning::UnknownPartitioning(partitions.len());
159 let boundedness = if infinite {
160 Boundedness::Unbounded {
161 requires_infinite_memory: false,
162 }
163 } else {
164 Boundedness::Bounded
165 };
166 PlanProperties::new(
167 eq_properties,
168 output_partitioning,
169 EmissionType::Incremental,
170 boundedness,
171 )
172 .with_scheduling_type(SchedulingType::Cooperative)
173 }
174}
175
176impl Debug for StreamingTableExec {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
179 }
180}
181
182impl DisplayAs for StreamingTableExec {
183 fn fmt_as(
184 &self,
185 t: DisplayFormatType,
186 f: &mut std::fmt::Formatter,
187 ) -> std::fmt::Result {
188 match t {
189 DisplayFormatType::Default | DisplayFormatType::Verbose => {
190 write!(
191 f,
192 "StreamingTableExec: partition_sizes={:?}",
193 self.partitions.len(),
194 )?;
195 if !self.projected_schema.fields().is_empty() {
196 write!(
197 f,
198 ", projection={}",
199 ProjectSchemaDisplay(&self.projected_schema)
200 )?;
201 }
202 if self.infinite {
203 write!(f, ", infinite_source=true")?;
204 }
205 if let Some(fetch) = self.limit {
206 write!(f, ", fetch={fetch}")?;
207 }
208
209 display_orderings(f, &self.projected_output_ordering)?;
210
211 Ok(())
212 }
213 DisplayFormatType::TreeRender => {
214 if self.infinite {
215 writeln!(f, "infinite={}", self.infinite)?;
216 }
217 if let Some(limit) = self.limit {
218 write!(f, "limit={limit}")?;
219 } else {
220 write!(f, "limit=None")?;
221 }
222
223 Ok(())
224 }
225 }
226 }
227}
228
229#[async_trait]
230impl ExecutionPlan for StreamingTableExec {
231 fn name(&self) -> &'static str {
232 "StreamingTableExec"
233 }
234
235 fn as_any(&self) -> &dyn Any {
236 self
237 }
238
239 fn properties(&self) -> &PlanProperties {
240 &self.cache
241 }
242
243 fn fetch(&self) -> Option<usize> {
244 self.limit
245 }
246
247 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
248 vec![]
249 }
250
251 fn with_new_children(
252 self: Arc<Self>,
253 children: Vec<Arc<dyn ExecutionPlan>>,
254 ) -> Result<Arc<dyn ExecutionPlan>> {
255 if children.is_empty() {
256 Ok(self)
257 } else {
258 internal_err!("Children cannot be replaced in {self:?}")
259 }
260 }
261
262 fn execute(
263 &self,
264 partition: usize,
265 ctx: Arc<TaskContext>,
266 ) -> Result<SendableRecordBatchStream> {
267 let stream = self.partitions[partition].execute(Arc::clone(&ctx));
268 let projected_stream = match self.projection.clone() {
269 Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
270 Arc::clone(&self.projected_schema),
271 stream.map(move |x| {
272 x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
273 }),
274 )),
275 None => stream,
276 };
277 let stream = make_cooperative(projected_stream);
278
279 Ok(match self.limit {
280 None => stream,
281 Some(fetch) => {
282 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
283 Box::pin(LimitStream::new(stream, 0, Some(fetch), baseline_metrics))
284 }
285 })
286 }
287
288 fn try_swapping_with_projection(
292 &self,
293 projection: &ProjectionExec,
294 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
295 if !all_alias_free_columns(projection.expr()) {
296 return Ok(None);
297 }
298
299 let streaming_table_projections =
300 self.projection().as_ref().map(|i| i.as_ref().to_vec());
301 let new_projections = new_projections_for_columns(
302 projection.expr(),
303 &streaming_table_projections
304 .unwrap_or_else(|| (0..self.schema().fields().len()).collect()),
305 );
306
307 let mut lex_orderings = vec![];
308 for ordering in self.projected_output_ordering().into_iter() {
309 let Some(ordering) = update_ordering(ordering, projection.expr())? else {
310 return Ok(None);
311 };
312 lex_orderings.push(ordering);
313 }
314
315 StreamingTableExec::try_new(
316 Arc::clone(self.partition_schema()),
317 self.partitions().clone(),
318 Some(new_projections.as_ref()),
319 lex_orderings,
320 self.is_infinite(),
321 self.limit(),
322 )
323 .map(|e| Some(Arc::new(e) as _))
324 }
325
326 fn metrics(&self) -> Option<MetricsSet> {
327 Some(self.metrics.clone_inner())
328 }
329
330 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
331 Some(Arc::new(StreamingTableExec {
332 partitions: self.partitions.clone(),
333 projection: self.projection.clone(),
334 projected_schema: Arc::clone(&self.projected_schema),
335 projected_output_ordering: self.projected_output_ordering.clone(),
336 infinite: self.infinite,
337 limit,
338 cache: self.cache.clone(),
339 metrics: self.metrics.clone(),
340 }))
341 }
342}
343
344#[cfg(test)]
345mod test {
346 use super::*;
347 use crate::collect_partitioned;
348 use crate::streaming::PartitionStream;
349 use crate::test::{make_partition, TestPartitionStream};
350 use arrow::record_batch::RecordBatch;
351
352 #[tokio::test]
353 async fn test_no_limit() {
354 let exec = TestBuilder::new()
355 .with_batches(vec![make_partition(100), make_partition(100)])
357 .build();
358
359 let counts = collect_num_rows(Arc::new(exec)).await;
360 assert_eq!(counts, vec![200]);
361 }
362
363 #[tokio::test]
364 async fn test_limit() {
365 let exec = TestBuilder::new()
366 .with_batches(vec![make_partition(100), make_partition(100)])
368 .with_limit(Some(75))
370 .build();
371
372 let counts = collect_num_rows(Arc::new(exec)).await;
373 assert_eq!(counts, vec![75]);
374 }
375
376 async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
379 let ctx = Arc::new(TaskContext::default());
380 let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
381 partition_batches
382 .into_iter()
383 .map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
384 .collect()
385 }
386
387 #[derive(Default)]
388 struct TestBuilder {
389 schema: Option<SchemaRef>,
390 partitions: Vec<Arc<dyn PartitionStream>>,
391 projection: Option<Vec<usize>>,
392 projected_output_ordering: Vec<LexOrdering>,
393 infinite: bool,
394 limit: Option<usize>,
395 }
396
397 impl TestBuilder {
398 fn new() -> Self {
399 Self::default()
400 }
401
402 fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
404 let stream = TestPartitionStream::new_with_batches(batches);
405 self.schema = Some(Arc::clone(stream.schema()));
406 self.partitions = vec![Arc::new(stream)];
407 self
408 }
409
410 fn with_limit(mut self, limit: Option<usize>) -> Self {
412 self.limit = limit;
413 self
414 }
415
416 fn build(self) -> StreamingTableExec {
417 StreamingTableExec::try_new(
418 self.schema.unwrap(),
419 self.partitions,
420 self.projection.as_ref(),
421 self.projected_output_ordering,
422 self.infinite,
423 self.limit,
424 )
425 .unwrap()
426 }
427 }
428}