datafusion_physical_plan/
streaming.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Generic plans for deferred execution: [`StreamingTableExec`] and [`PartitionStream`]
19
20use std::any::Any;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use super::{DisplayAs, DisplayFormatType, PlanProperties};
25use crate::coop::make_cooperative;
26use crate::display::{display_orderings, ProjectSchemaDisplay};
27use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
28use crate::limit::LimitStream;
29use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
30use crate::projection::{
31    all_alias_free_columns, new_projections_for_columns, update_ordering, ProjectionExec,
32};
33use crate::stream::RecordBatchStreamAdapter;
34use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
35
36use arrow::datatypes::{Schema, SchemaRef};
37use datafusion_common::{internal_err, plan_err, Result};
38use datafusion_execution::TaskContext;
39use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
40
41use async_trait::async_trait;
42use futures::stream::StreamExt;
43use log::debug;
44
45/// A partition that can be converted into a [`SendableRecordBatchStream`]
46///
47/// Combined with [`StreamingTableExec`], you can use this trait to implement
48/// [`ExecutionPlan`] for a custom source with less boiler plate than
49/// implementing `ExecutionPlan` directly for many use cases.
50pub trait PartitionStream: Debug + Send + Sync {
51    /// Returns the schema of this partition
52    fn schema(&self) -> &SchemaRef;
53
54    /// Returns a stream yielding this partitions values
55    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
56}
57
58/// An [`ExecutionPlan`] for one or more [`PartitionStream`]s.
59///
60/// If your source can be represented as one or more [`PartitionStream`]s, you can
61/// use this struct to implement [`ExecutionPlan`].
62#[derive(Clone)]
63pub struct StreamingTableExec {
64    partitions: Vec<Arc<dyn PartitionStream>>,
65    projection: Option<Arc<[usize]>>,
66    projected_schema: SchemaRef,
67    projected_output_ordering: Vec<LexOrdering>,
68    infinite: bool,
69    limit: Option<usize>,
70    cache: PlanProperties,
71    metrics: ExecutionPlanMetricsSet,
72}
73
74impl StreamingTableExec {
75    /// Try to create a new [`StreamingTableExec`] returning an error if the schema is incorrect
76    pub fn try_new(
77        schema: SchemaRef,
78        partitions: Vec<Arc<dyn PartitionStream>>,
79        projection: Option<&Vec<usize>>,
80        projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
81        infinite: bool,
82        limit: Option<usize>,
83    ) -> Result<Self> {
84        for x in partitions.iter() {
85            let partition_schema = x.schema();
86            if !schema.eq(partition_schema) {
87                debug!(
88                    "Target schema does not match with partition schema. \
89                        Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
90                );
91                return plan_err!("Mismatch between schema and batches");
92            }
93        }
94
95        let projected_schema = match projection {
96            Some(p) => Arc::new(schema.project(p)?),
97            None => schema,
98        };
99        let projected_output_ordering =
100            projected_output_ordering.into_iter().collect::<Vec<_>>();
101        let cache = Self::compute_properties(
102            Arc::clone(&projected_schema),
103            projected_output_ordering.clone(),
104            &partitions,
105            infinite,
106        );
107        Ok(Self {
108            partitions,
109            projected_schema,
110            projection: projection.cloned().map(Into::into),
111            projected_output_ordering,
112            infinite,
113            limit,
114            cache,
115            metrics: ExecutionPlanMetricsSet::new(),
116        })
117    }
118
119    pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
120        &self.partitions
121    }
122
123    pub fn partition_schema(&self) -> &SchemaRef {
124        self.partitions[0].schema()
125    }
126
127    pub fn projection(&self) -> &Option<Arc<[usize]>> {
128        &self.projection
129    }
130
131    pub fn projected_schema(&self) -> &Schema {
132        &self.projected_schema
133    }
134
135    pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
136        self.projected_output_ordering.clone()
137    }
138
139    pub fn is_infinite(&self) -> bool {
140        self.infinite
141    }
142
143    pub fn limit(&self) -> Option<usize> {
144        self.limit
145    }
146
147    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
148    fn compute_properties(
149        schema: SchemaRef,
150        orderings: Vec<LexOrdering>,
151        partitions: &[Arc<dyn PartitionStream>],
152        infinite: bool,
153    ) -> PlanProperties {
154        // Calculate equivalence properties:
155        let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
156
157        // Get output partitioning:
158        let output_partitioning = Partitioning::UnknownPartitioning(partitions.len());
159        let boundedness = if infinite {
160            Boundedness::Unbounded {
161                requires_infinite_memory: false,
162            }
163        } else {
164            Boundedness::Bounded
165        };
166        PlanProperties::new(
167            eq_properties,
168            output_partitioning,
169            EmissionType::Incremental,
170            boundedness,
171        )
172        .with_scheduling_type(SchedulingType::Cooperative)
173    }
174}
175
176impl Debug for StreamingTableExec {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
179    }
180}
181
182impl DisplayAs for StreamingTableExec {
183    fn fmt_as(
184        &self,
185        t: DisplayFormatType,
186        f: &mut std::fmt::Formatter,
187    ) -> std::fmt::Result {
188        match t {
189            DisplayFormatType::Default | DisplayFormatType::Verbose => {
190                write!(
191                    f,
192                    "StreamingTableExec: partition_sizes={:?}",
193                    self.partitions.len(),
194                )?;
195                if !self.projected_schema.fields().is_empty() {
196                    write!(
197                        f,
198                        ", projection={}",
199                        ProjectSchemaDisplay(&self.projected_schema)
200                    )?;
201                }
202                if self.infinite {
203                    write!(f, ", infinite_source=true")?;
204                }
205                if let Some(fetch) = self.limit {
206                    write!(f, ", fetch={fetch}")?;
207                }
208
209                display_orderings(f, &self.projected_output_ordering)?;
210
211                Ok(())
212            }
213            DisplayFormatType::TreeRender => {
214                if self.infinite {
215                    writeln!(f, "infinite={}", self.infinite)?;
216                }
217                if let Some(limit) = self.limit {
218                    write!(f, "limit={limit}")?;
219                } else {
220                    write!(f, "limit=None")?;
221                }
222
223                Ok(())
224            }
225        }
226    }
227}
228
229#[async_trait]
230impl ExecutionPlan for StreamingTableExec {
231    fn name(&self) -> &'static str {
232        "StreamingTableExec"
233    }
234
235    fn as_any(&self) -> &dyn Any {
236        self
237    }
238
239    fn properties(&self) -> &PlanProperties {
240        &self.cache
241    }
242
243    fn fetch(&self) -> Option<usize> {
244        self.limit
245    }
246
247    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
248        vec![]
249    }
250
251    fn with_new_children(
252        self: Arc<Self>,
253        children: Vec<Arc<dyn ExecutionPlan>>,
254    ) -> Result<Arc<dyn ExecutionPlan>> {
255        if children.is_empty() {
256            Ok(self)
257        } else {
258            internal_err!("Children cannot be replaced in {self:?}")
259        }
260    }
261
262    fn execute(
263        &self,
264        partition: usize,
265        ctx: Arc<TaskContext>,
266    ) -> Result<SendableRecordBatchStream> {
267        let stream = self.partitions[partition].execute(Arc::clone(&ctx));
268        let projected_stream = match self.projection.clone() {
269            Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
270                Arc::clone(&self.projected_schema),
271                stream.map(move |x| {
272                    x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
273                }),
274            )),
275            None => stream,
276        };
277        let stream = make_cooperative(projected_stream);
278
279        Ok(match self.limit {
280            None => stream,
281            Some(fetch) => {
282                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
283                Box::pin(LimitStream::new(stream, 0, Some(fetch), baseline_metrics))
284            }
285        })
286    }
287
288    /// Tries to embed `projection` to its input (`streaming table`).
289    /// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise,
290    /// returns `None`.
291    fn try_swapping_with_projection(
292        &self,
293        projection: &ProjectionExec,
294    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
295        if !all_alias_free_columns(projection.expr()) {
296            return Ok(None);
297        }
298
299        let streaming_table_projections =
300            self.projection().as_ref().map(|i| i.as_ref().to_vec());
301        let new_projections = new_projections_for_columns(
302            projection.expr(),
303            &streaming_table_projections
304                .unwrap_or_else(|| (0..self.schema().fields().len()).collect()),
305        );
306
307        let mut lex_orderings = vec![];
308        for ordering in self.projected_output_ordering().into_iter() {
309            let Some(ordering) = update_ordering(ordering, projection.expr())? else {
310                return Ok(None);
311            };
312            lex_orderings.push(ordering);
313        }
314
315        StreamingTableExec::try_new(
316            Arc::clone(self.partition_schema()),
317            self.partitions().clone(),
318            Some(new_projections.as_ref()),
319            lex_orderings,
320            self.is_infinite(),
321            self.limit(),
322        )
323        .map(|e| Some(Arc::new(e) as _))
324    }
325
326    fn metrics(&self) -> Option<MetricsSet> {
327        Some(self.metrics.clone_inner())
328    }
329
330    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
331        Some(Arc::new(StreamingTableExec {
332            partitions: self.partitions.clone(),
333            projection: self.projection.clone(),
334            projected_schema: Arc::clone(&self.projected_schema),
335            projected_output_ordering: self.projected_output_ordering.clone(),
336            infinite: self.infinite,
337            limit,
338            cache: self.cache.clone(),
339            metrics: self.metrics.clone(),
340        }))
341    }
342}
343
344#[cfg(test)]
345mod test {
346    use super::*;
347    use crate::collect_partitioned;
348    use crate::streaming::PartitionStream;
349    use crate::test::{make_partition, TestPartitionStream};
350    use arrow::record_batch::RecordBatch;
351
352    #[tokio::test]
353    async fn test_no_limit() {
354        let exec = TestBuilder::new()
355            // Make 2 batches, each with 100 rows
356            .with_batches(vec![make_partition(100), make_partition(100)])
357            .build();
358
359        let counts = collect_num_rows(Arc::new(exec)).await;
360        assert_eq!(counts, vec![200]);
361    }
362
363    #[tokio::test]
364    async fn test_limit() {
365        let exec = TestBuilder::new()
366            // Make 2 batches, each with 100 rows
367            .with_batches(vec![make_partition(100), make_partition(100)])
368            // Limit to only the first 75 rows back
369            .with_limit(Some(75))
370            .build();
371
372        let counts = collect_num_rows(Arc::new(exec)).await;
373        assert_eq!(counts, vec![75]);
374    }
375
376    /// Runs the provided execution plan and returns a vector of the number of
377    /// rows in each partition
378    async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
379        let ctx = Arc::new(TaskContext::default());
380        let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
381        partition_batches
382            .into_iter()
383            .map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
384            .collect()
385    }
386
387    #[derive(Default)]
388    struct TestBuilder {
389        schema: Option<SchemaRef>,
390        partitions: Vec<Arc<dyn PartitionStream>>,
391        projection: Option<Vec<usize>>,
392        projected_output_ordering: Vec<LexOrdering>,
393        infinite: bool,
394        limit: Option<usize>,
395    }
396
397    impl TestBuilder {
398        fn new() -> Self {
399            Self::default()
400        }
401
402        /// Set the batches for the stream
403        fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
404            let stream = TestPartitionStream::new_with_batches(batches);
405            self.schema = Some(Arc::clone(stream.schema()));
406            self.partitions = vec![Arc::new(stream)];
407            self
408        }
409
410        /// Set the limit for the stream
411        fn with_limit(mut self, limit: Option<usize>) -> Self {
412            self.limit = limit;
413            self
414        }
415
416        fn build(self) -> StreamingTableExec {
417            StreamingTableExec::try_new(
418                self.schema.unwrap(),
419                self.partitions,
420                self.projection.as_ref(),
421                self.projected_output_ordering,
422                self.infinite,
423                self.limit,
424            )
425            .unwrap()
426        }
427    }
428}