Skip to main content

datafusion_physical_plan/
streaming.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Generic plans for deferred execution: [`StreamingTableExec`] and [`PartitionStream`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use super::{DisplayAs, DisplayFormatType, PlanProperties};
24use crate::coop::make_cooperative;
25use crate::display::{ProjectSchemaDisplay, display_orderings};
26use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
27use crate::limit::LimitStream;
28use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
29use crate::projection::{
30    ProjectionExec, all_alias_free_columns, new_projections_for_columns, update_ordering,
31};
32use crate::stream::RecordBatchStreamAdapter;
33use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
34
35use arrow::datatypes::{Schema, SchemaRef};
36use datafusion_common::{Result, internal_err, plan_err};
37use datafusion_execution::TaskContext;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
39
40use async_trait::async_trait;
41use futures::stream::StreamExt;
42use log::debug;
43
44/// A partition that can be converted into a [`SendableRecordBatchStream`]
45///
46/// Combined with [`StreamingTableExec`], you can use this trait to implement
47/// [`ExecutionPlan`] for a custom source with less boiler plate than
48/// implementing `ExecutionPlan` directly for many use cases.
49pub trait PartitionStream: Debug + Send + Sync {
50    /// Returns the schema of this partition
51    fn schema(&self) -> &SchemaRef;
52
53    /// Returns a stream yielding this partitions values
54    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
55}
56
57/// An [`ExecutionPlan`] for one or more [`PartitionStream`]s.
58///
59/// If your source can be represented as one or more [`PartitionStream`]s, you can
60/// use this struct to implement [`ExecutionPlan`].
61#[derive(Clone)]
62pub struct StreamingTableExec {
63    partitions: Vec<Arc<dyn PartitionStream>>,
64    projection: Option<Arc<[usize]>>,
65    projected_schema: SchemaRef,
66    projected_output_ordering: Vec<LexOrdering>,
67    infinite: bool,
68    limit: Option<usize>,
69    cache: Arc<PlanProperties>,
70    metrics: ExecutionPlanMetricsSet,
71}
72
73impl StreamingTableExec {
74    /// Try to create a new [`StreamingTableExec`] returning an error if the schema is incorrect
75    pub fn try_new(
76        schema: SchemaRef,
77        partitions: Vec<Arc<dyn PartitionStream>>,
78        projection: Option<&Vec<usize>>,
79        projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
80        infinite: bool,
81        limit: Option<usize>,
82    ) -> Result<Self> {
83        for x in partitions.iter() {
84            let partition_schema = x.schema();
85            if !schema.eq(partition_schema) {
86                debug!(
87                    "Target schema does not match with partition schema. \
88                        Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
89                );
90                return plan_err!("Mismatch between schema and batches");
91            }
92        }
93
94        let projected_schema = match projection {
95            Some(p) => Arc::new(schema.project(p)?),
96            None => schema,
97        };
98        let projected_output_ordering =
99            projected_output_ordering.into_iter().collect::<Vec<_>>();
100        let cache = Self::compute_properties(
101            Arc::clone(&projected_schema),
102            projected_output_ordering.clone(),
103            &partitions,
104            infinite,
105        );
106        Ok(Self {
107            partitions,
108            projected_schema,
109            projection: projection.cloned().map(Into::into),
110            projected_output_ordering,
111            infinite,
112            limit,
113            cache: Arc::new(cache),
114            metrics: ExecutionPlanMetricsSet::new(),
115        })
116    }
117
118    pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
119        &self.partitions
120    }
121
122    pub fn partition_schema(&self) -> &SchemaRef {
123        self.partitions[0].schema()
124    }
125
126    pub fn projection(&self) -> &Option<Arc<[usize]>> {
127        &self.projection
128    }
129
130    pub fn projected_schema(&self) -> &Schema {
131        &self.projected_schema
132    }
133
134    pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
135        self.projected_output_ordering.clone()
136    }
137
138    pub fn is_infinite(&self) -> bool {
139        self.infinite
140    }
141
142    pub fn limit(&self) -> Option<usize> {
143        self.limit
144    }
145
146    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
147    fn compute_properties(
148        schema: SchemaRef,
149        orderings: Vec<LexOrdering>,
150        partitions: &[Arc<dyn PartitionStream>],
151        infinite: bool,
152    ) -> PlanProperties {
153        // Calculate equivalence properties:
154        let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
155
156        // Get output partitioning:
157        let output_partitioning = Partitioning::UnknownPartitioning(partitions.len());
158        let boundedness = if infinite {
159            Boundedness::Unbounded {
160                requires_infinite_memory: false,
161            }
162        } else {
163            Boundedness::Bounded
164        };
165        PlanProperties::new(
166            eq_properties,
167            output_partitioning,
168            EmissionType::Incremental,
169            boundedness,
170        )
171        .with_scheduling_type(SchedulingType::Cooperative)
172    }
173}
174
175impl Debug for StreamingTableExec {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
178    }
179}
180
181impl DisplayAs for StreamingTableExec {
182    fn fmt_as(
183        &self,
184        t: DisplayFormatType,
185        f: &mut std::fmt::Formatter,
186    ) -> std::fmt::Result {
187        match t {
188            DisplayFormatType::Default | DisplayFormatType::Verbose => {
189                write!(
190                    f,
191                    "StreamingTableExec: partition_sizes={:?}",
192                    self.partitions.len(),
193                )?;
194                if !self.projected_schema.fields().is_empty() {
195                    write!(
196                        f,
197                        ", projection={}",
198                        ProjectSchemaDisplay(&self.projected_schema)
199                    )?;
200                }
201                if self.infinite {
202                    write!(f, ", infinite_source=true")?;
203                }
204                if let Some(fetch) = self.limit {
205                    write!(f, ", fetch={fetch}")?;
206                }
207
208                display_orderings(f, &self.projected_output_ordering)?;
209
210                Ok(())
211            }
212            DisplayFormatType::TreeRender => {
213                if self.infinite {
214                    writeln!(f, "infinite={}", self.infinite)?;
215                }
216                if let Some(limit) = self.limit {
217                    write!(f, "limit={limit}")?;
218                } else {
219                    write!(f, "limit=None")?;
220                }
221
222                Ok(())
223            }
224        }
225    }
226}
227
228#[async_trait]
229impl ExecutionPlan for StreamingTableExec {
230    fn name(&self) -> &'static str {
231        "StreamingTableExec"
232    }
233
234    fn properties(&self) -> &Arc<PlanProperties> {
235        &self.cache
236    }
237
238    fn fetch(&self) -> Option<usize> {
239        self.limit
240    }
241
242    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
243        vec![]
244    }
245
246    fn with_new_children(
247        self: Arc<Self>,
248        children: Vec<Arc<dyn ExecutionPlan>>,
249    ) -> Result<Arc<dyn ExecutionPlan>> {
250        if children.is_empty() {
251            Ok(self)
252        } else {
253            internal_err!("Children cannot be replaced in {self:?}")
254        }
255    }
256
257    fn execute(
258        &self,
259        partition: usize,
260        ctx: Arc<TaskContext>,
261    ) -> Result<SendableRecordBatchStream> {
262        let stream = self.partitions[partition].execute(Arc::clone(&ctx));
263        let projected_stream = match self.projection.clone() {
264            Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
265                Arc::clone(&self.projected_schema),
266                stream.map(move |x| {
267                    x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
268                }),
269            )),
270            None => stream,
271        };
272        let stream = make_cooperative(projected_stream);
273
274        Ok(match self.limit {
275            None => stream,
276            Some(fetch) => {
277                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
278                Box::pin(LimitStream::new(stream, 0, Some(fetch), baseline_metrics))
279            }
280        })
281    }
282
283    /// Tries to embed `projection` to its input (`streaming table`).
284    /// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise,
285    /// returns `None`.
286    fn try_swapping_with_projection(
287        &self,
288        projection: &ProjectionExec,
289    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
290        if !all_alias_free_columns(projection.expr()) {
291            return Ok(None);
292        }
293
294        let streaming_table_projections =
295            self.projection().as_ref().map(|i| i.as_ref().to_vec());
296        let new_projections = new_projections_for_columns(
297            projection.expr(),
298            &streaming_table_projections
299                .unwrap_or_else(|| (0..self.schema().fields().len()).collect()),
300        );
301
302        let mut lex_orderings = vec![];
303        for ordering in self.projected_output_ordering().into_iter() {
304            let Some(ordering) = update_ordering(ordering, projection.expr())? else {
305                return Ok(None);
306            };
307            lex_orderings.push(ordering);
308        }
309
310        StreamingTableExec::try_new(
311            Arc::clone(self.partition_schema()),
312            self.partitions().clone(),
313            Some(new_projections.as_ref()),
314            lex_orderings,
315            self.is_infinite(),
316            self.limit(),
317        )
318        .map(|e| Some(Arc::new(e) as _))
319    }
320
321    fn metrics(&self) -> Option<MetricsSet> {
322        Some(self.metrics.clone_inner())
323    }
324
325    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
326        Some(Arc::new(StreamingTableExec {
327            partitions: self.partitions.clone(),
328            projection: self.projection.clone(),
329            projected_schema: Arc::clone(&self.projected_schema),
330            projected_output_ordering: self.projected_output_ordering.clone(),
331            infinite: self.infinite,
332            limit,
333            cache: Arc::clone(&self.cache),
334            metrics: self.metrics.clone(),
335        }))
336    }
337}
338
339#[cfg(test)]
340mod test {
341    use super::*;
342    use crate::collect_partitioned;
343    use crate::streaming::PartitionStream;
344    use crate::test::{TestPartitionStream, make_partition};
345    use arrow::record_batch::RecordBatch;
346
347    #[tokio::test]
348    async fn test_no_limit() {
349        let exec = TestBuilder::new()
350            // Make 2 batches, each with 100 rows
351            .with_batches(vec![make_partition(100), make_partition(100)])
352            .build();
353
354        let counts = collect_num_rows(Arc::new(exec)).await;
355        assert_eq!(counts, vec![200]);
356    }
357
358    #[tokio::test]
359    async fn test_limit() {
360        let exec = TestBuilder::new()
361            // Make 2 batches, each with 100 rows
362            .with_batches(vec![make_partition(100), make_partition(100)])
363            // Limit to only the first 75 rows back
364            .with_limit(Some(75))
365            .build();
366
367        let counts = collect_num_rows(Arc::new(exec)).await;
368        assert_eq!(counts, vec![75]);
369    }
370
371    /// Runs the provided execution plan and returns a vector of the number of
372    /// rows in each partition
373    async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
374        let ctx = Arc::new(TaskContext::default());
375        let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
376        partition_batches
377            .into_iter()
378            .map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
379            .collect()
380    }
381
382    #[derive(Default)]
383    struct TestBuilder {
384        schema: Option<SchemaRef>,
385        partitions: Vec<Arc<dyn PartitionStream>>,
386        projection: Option<Vec<usize>>,
387        projected_output_ordering: Vec<LexOrdering>,
388        infinite: bool,
389        limit: Option<usize>,
390    }
391
392    impl TestBuilder {
393        fn new() -> Self {
394            Self::default()
395        }
396
397        /// Set the batches for the stream
398        fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
399            let stream = TestPartitionStream::new_with_batches(batches);
400            self.schema = Some(Arc::clone(stream.schema()));
401            self.partitions = vec![Arc::new(stream)];
402            self
403        }
404
405        /// Set the limit for the stream
406        fn with_limit(mut self, limit: Option<usize>) -> Self {
407            self.limit = limit;
408            self
409        }
410
411        fn build(self) -> StreamingTableExec {
412            StreamingTableExec::try_new(
413                self.schema.unwrap(),
414                self.partitions,
415                self.projection.as_ref(),
416                self.projected_output_ordering,
417                self.infinite,
418                self.limit,
419            )
420            .unwrap()
421        }
422    }
423}