datafusion_physical_plan/
streaming.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Generic plans for deferred execution: [`StreamingTableExec`] and [`PartitionStream`]
19
20use std::any::Any;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use super::{DisplayAs, DisplayFormatType, PlanProperties};
25use crate::display::{display_orderings, ProjectSchemaDisplay};
26use crate::execution_plan::{Boundedness, EmissionType};
27use crate::limit::LimitStream;
28use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
29use crate::projection::{
30    all_alias_free_columns, new_projections_for_columns, update_expr, ProjectionExec,
31};
32use crate::stream::RecordBatchStreamAdapter;
33use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
34
35use arrow::datatypes::{Schema, SchemaRef};
36use datafusion_common::{internal_err, plan_err, Result};
37use datafusion_execution::TaskContext;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
39
40use async_trait::async_trait;
41use futures::stream::StreamExt;
42use log::debug;
43
44/// A partition that can be converted into a [`SendableRecordBatchStream`]
45///
46/// Combined with [`StreamingTableExec`], you can use this trait to implement
47/// [`ExecutionPlan`] for a custom source with less boiler plate than
48/// implementing `ExecutionPlan` directly for many use cases.
49pub trait PartitionStream: Debug + Send + Sync {
50    /// Returns the schema of this partition
51    fn schema(&self) -> &SchemaRef;
52
53    /// Returns a stream yielding this partitions values
54    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
55}
56
57/// An [`ExecutionPlan`] for one or more [`PartitionStream`]s.
58///
59/// If your source can be represented as one or more [`PartitionStream`]s, you can
60/// use this struct to implement [`ExecutionPlan`].
61#[derive(Clone)]
62pub struct StreamingTableExec {
63    partitions: Vec<Arc<dyn PartitionStream>>,
64    projection: Option<Arc<[usize]>>,
65    projected_schema: SchemaRef,
66    projected_output_ordering: Vec<LexOrdering>,
67    infinite: bool,
68    limit: Option<usize>,
69    cache: PlanProperties,
70    metrics: ExecutionPlanMetricsSet,
71}
72
73impl StreamingTableExec {
74    /// Try to create a new [`StreamingTableExec`] returning an error if the schema is incorrect
75    pub fn try_new(
76        schema: SchemaRef,
77        partitions: Vec<Arc<dyn PartitionStream>>,
78        projection: Option<&Vec<usize>>,
79        projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
80        infinite: bool,
81        limit: Option<usize>,
82    ) -> Result<Self> {
83        for x in partitions.iter() {
84            let partition_schema = x.schema();
85            if !schema.eq(partition_schema) {
86                debug!(
87                    "Target schema does not match with partition schema. \
88                        Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
89                );
90                return plan_err!("Mismatch between schema and batches");
91            }
92        }
93
94        let projected_schema = match projection {
95            Some(p) => Arc::new(schema.project(p)?),
96            None => schema,
97        };
98        let projected_output_ordering =
99            projected_output_ordering.into_iter().collect::<Vec<_>>();
100        let cache = Self::compute_properties(
101            Arc::clone(&projected_schema),
102            &projected_output_ordering,
103            &partitions,
104            infinite,
105        );
106        Ok(Self {
107            partitions,
108            projected_schema,
109            projection: projection.cloned().map(Into::into),
110            projected_output_ordering,
111            infinite,
112            limit,
113            cache,
114            metrics: ExecutionPlanMetricsSet::new(),
115        })
116    }
117
118    pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
119        &self.partitions
120    }
121
122    pub fn partition_schema(&self) -> &SchemaRef {
123        self.partitions[0].schema()
124    }
125
126    pub fn projection(&self) -> &Option<Arc<[usize]>> {
127        &self.projection
128    }
129
130    pub fn projected_schema(&self) -> &Schema {
131        &self.projected_schema
132    }
133
134    pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
135        self.projected_output_ordering.clone()
136    }
137
138    pub fn is_infinite(&self) -> bool {
139        self.infinite
140    }
141
142    pub fn limit(&self) -> Option<usize> {
143        self.limit
144    }
145
146    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
147    fn compute_properties(
148        schema: SchemaRef,
149        orderings: &[LexOrdering],
150        partitions: &[Arc<dyn PartitionStream>],
151        infinite: bool,
152    ) -> PlanProperties {
153        // Calculate equivalence properties:
154        let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
155
156        // Get output partitioning:
157        let output_partitioning = Partitioning::UnknownPartitioning(partitions.len());
158        let boundedness = if infinite {
159            Boundedness::Unbounded {
160                requires_infinite_memory: false,
161            }
162        } else {
163            Boundedness::Bounded
164        };
165        PlanProperties::new(
166            eq_properties,
167            output_partitioning,
168            EmissionType::Incremental,
169            boundedness,
170        )
171    }
172}
173
174impl Debug for StreamingTableExec {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
177    }
178}
179
180impl DisplayAs for StreamingTableExec {
181    fn fmt_as(
182        &self,
183        t: DisplayFormatType,
184        f: &mut std::fmt::Formatter,
185    ) -> std::fmt::Result {
186        match t {
187            DisplayFormatType::Default | DisplayFormatType::Verbose => {
188                write!(
189                    f,
190                    "StreamingTableExec: partition_sizes={:?}",
191                    self.partitions.len(),
192                )?;
193                if !self.projected_schema.fields().is_empty() {
194                    write!(
195                        f,
196                        ", projection={}",
197                        ProjectSchemaDisplay(&self.projected_schema)
198                    )?;
199                }
200                if self.infinite {
201                    write!(f, ", infinite_source=true")?;
202                }
203                if let Some(fetch) = self.limit {
204                    write!(f, ", fetch={fetch}")?;
205                }
206
207                display_orderings(f, &self.projected_output_ordering)?;
208
209                Ok(())
210            }
211            DisplayFormatType::TreeRender => {
212                if self.infinite {
213                    writeln!(f, "infinite={}", self.infinite)?;
214                }
215                if let Some(limit) = self.limit {
216                    write!(f, "limit={limit}")?;
217                } else {
218                    write!(f, "limit=None")?;
219                }
220
221                Ok(())
222            }
223        }
224    }
225}
226
227#[async_trait]
228impl ExecutionPlan for StreamingTableExec {
229    fn name(&self) -> &'static str {
230        "StreamingTableExec"
231    }
232
233    fn as_any(&self) -> &dyn Any {
234        self
235    }
236
237    fn properties(&self) -> &PlanProperties {
238        &self.cache
239    }
240
241    fn fetch(&self) -> Option<usize> {
242        self.limit
243    }
244
245    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
246        vec![]
247    }
248
249    fn with_new_children(
250        self: Arc<Self>,
251        children: Vec<Arc<dyn ExecutionPlan>>,
252    ) -> Result<Arc<dyn ExecutionPlan>> {
253        if children.is_empty() {
254            Ok(self)
255        } else {
256            internal_err!("Children cannot be replaced in {self:?}")
257        }
258    }
259
260    fn execute(
261        &self,
262        partition: usize,
263        ctx: Arc<TaskContext>,
264    ) -> Result<SendableRecordBatchStream> {
265        let stream = self.partitions[partition].execute(ctx);
266        let projected_stream = match self.projection.clone() {
267            Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
268                Arc::clone(&self.projected_schema),
269                stream.map(move |x| {
270                    x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
271                }),
272            )),
273            None => stream,
274        };
275        Ok(match self.limit {
276            None => projected_stream,
277            Some(fetch) => {
278                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
279                Box::pin(LimitStream::new(
280                    projected_stream,
281                    0,
282                    Some(fetch),
283                    baseline_metrics,
284                ))
285            }
286        })
287    }
288
289    /// Tries to embed `projection` to its input (`streaming table`).
290    /// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise,
291    /// returns `None`.
292    fn try_swapping_with_projection(
293        &self,
294        projection: &ProjectionExec,
295    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
296        if !all_alias_free_columns(projection.expr()) {
297            return Ok(None);
298        }
299
300        let streaming_table_projections =
301            self.projection().as_ref().map(|i| i.as_ref().to_vec());
302        let new_projections = new_projections_for_columns(
303            projection,
304            &streaming_table_projections
305                .unwrap_or_else(|| (0..self.schema().fields().len()).collect()),
306        );
307
308        let mut lex_orderings = vec![];
309        for lex_ordering in self.projected_output_ordering().into_iter() {
310            let mut orderings = LexOrdering::default();
311            for order in lex_ordering {
312                let Some(new_ordering) =
313                    update_expr(&order.expr, projection.expr(), false)?
314                else {
315                    return Ok(None);
316                };
317                orderings.push(PhysicalSortExpr {
318                    expr: new_ordering,
319                    options: order.options,
320                });
321            }
322            lex_orderings.push(orderings);
323        }
324
325        StreamingTableExec::try_new(
326            Arc::clone(self.partition_schema()),
327            self.partitions().clone(),
328            Some(new_projections.as_ref()),
329            lex_orderings,
330            self.is_infinite(),
331            self.limit(),
332        )
333        .map(|e| Some(Arc::new(e) as _))
334    }
335
336    fn metrics(&self) -> Option<MetricsSet> {
337        Some(self.metrics.clone_inner())
338    }
339
340    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
341        Some(Arc::new(StreamingTableExec {
342            partitions: self.partitions.clone(),
343            projection: self.projection.clone(),
344            projected_schema: Arc::clone(&self.projected_schema),
345            projected_output_ordering: self.projected_output_ordering.clone(),
346            infinite: self.infinite,
347            limit,
348            cache: self.cache.clone(),
349            metrics: self.metrics.clone(),
350        }))
351    }
352}
353
354#[cfg(test)]
355mod test {
356    use super::*;
357    use crate::collect_partitioned;
358    use crate::streaming::PartitionStream;
359    use crate::test::{make_partition, TestPartitionStream};
360    use arrow::record_batch::RecordBatch;
361
362    #[tokio::test]
363    async fn test_no_limit() {
364        let exec = TestBuilder::new()
365            // Make 2 batches, each with 100 rows
366            .with_batches(vec![make_partition(100), make_partition(100)])
367            .build();
368
369        let counts = collect_num_rows(Arc::new(exec)).await;
370        assert_eq!(counts, vec![200]);
371    }
372
373    #[tokio::test]
374    async fn test_limit() {
375        let exec = TestBuilder::new()
376            // Make 2 batches, each with 100 rows
377            .with_batches(vec![make_partition(100), make_partition(100)])
378            // Limit to only the first 75 rows back
379            .with_limit(Some(75))
380            .build();
381
382        let counts = collect_num_rows(Arc::new(exec)).await;
383        assert_eq!(counts, vec![75]);
384    }
385
386    /// Runs the provided execution plan and returns a vector of the number of
387    /// rows in each partition
388    async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
389        let ctx = Arc::new(TaskContext::default());
390        let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
391        partition_batches
392            .into_iter()
393            .map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
394            .collect()
395    }
396
397    #[derive(Default)]
398    struct TestBuilder {
399        schema: Option<SchemaRef>,
400        partitions: Vec<Arc<dyn PartitionStream>>,
401        projection: Option<Vec<usize>>,
402        projected_output_ordering: Vec<LexOrdering>,
403        infinite: bool,
404        limit: Option<usize>,
405    }
406
407    impl TestBuilder {
408        fn new() -> Self {
409            Self::default()
410        }
411
412        /// Set the batches for the stream
413        fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
414            let stream = TestPartitionStream::new_with_batches(batches);
415            self.schema = Some(Arc::clone(stream.schema()));
416            self.partitions = vec![Arc::new(stream)];
417            self
418        }
419
420        /// Set the limit for the stream
421        fn with_limit(mut self, limit: Option<usize>) -> Self {
422            self.limit = limit;
423            self
424        }
425
426        fn build(self) -> StreamingTableExec {
427            StreamingTableExec::try_new(
428                self.schema.unwrap(),
429                self.partitions,
430                self.projection.as_ref(),
431                self.projected_output_ordering,
432                self.infinite,
433                self.limit,
434            )
435            .unwrap()
436        }
437    }
438}