Skip to main content

datafusion_catalog/
streaming.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A simplified [`TableProvider`] for streaming partitioned datasets
19
20use std::any::Any;
21use std::sync::Arc;
22
23use arrow::datatypes::SchemaRef;
24use async_trait::async_trait;
25use datafusion_common::{DFSchema, Result, plan_err};
26use datafusion_expr::{Expr, SortExpr, TableType};
27use datafusion_physical_expr::equivalence::project_ordering;
28use datafusion_physical_expr::{LexOrdering, create_physical_sort_exprs};
29use datafusion_physical_plan::ExecutionPlan;
30use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
31use log::debug;
32
33use crate::{Session, TableProvider};
34
35/// A [`TableProvider`] that streams a set of [`PartitionStream`]
36#[derive(Debug)]
37pub struct StreamingTable {
38    schema: SchemaRef,
39    partitions: Vec<Arc<dyn PartitionStream>>,
40    infinite: bool,
41    sort_order: Vec<SortExpr>,
42}
43
44impl StreamingTable {
45    /// Try to create a new [`StreamingTable`] returning an error if the schema is incorrect
46    pub fn try_new(
47        schema: SchemaRef,
48        partitions: Vec<Arc<dyn PartitionStream>>,
49    ) -> Result<Self> {
50        for x in partitions.iter() {
51            let partition_schema = x.schema();
52            if !schema.contains(partition_schema) {
53                debug!(
54                    "target schema does not contain partition schema. \
55                        Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
56                );
57                return plan_err!("Mismatch between schema and batches");
58            }
59        }
60
61        Ok(Self {
62            schema,
63            partitions,
64            infinite: false,
65            sort_order: vec![],
66        })
67    }
68
69    /// Sets streaming table can be infinite.
70    pub fn with_infinite_table(mut self, infinite: bool) -> Self {
71        self.infinite = infinite;
72        self
73    }
74
75    /// Sets the existing ordering of streaming table.
76    pub fn with_sort_order(mut self, sort_order: Vec<SortExpr>) -> Self {
77        self.sort_order = sort_order;
78        self
79    }
80}
81
82#[async_trait]
83impl TableProvider for StreamingTable {
84    fn as_any(&self) -> &dyn Any {
85        self
86    }
87
88    fn schema(&self) -> SchemaRef {
89        Arc::clone(&self.schema)
90    }
91
92    fn table_type(&self) -> TableType {
93        TableType::View
94    }
95
96    async fn scan(
97        &self,
98        state: &dyn Session,
99        projection: Option<&Vec<usize>>,
100        _filters: &[Expr],
101        limit: Option<usize>,
102    ) -> Result<Arc<dyn ExecutionPlan>> {
103        let physical_sort = if !self.sort_order.is_empty() {
104            let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
105            let eqp = state.execution_props();
106
107            let original_sort_exprs =
108                create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?;
109
110            if let Some(p) = projection {
111                // When performing a projection, the output columns will not match
112                // the original physical sort expression indices. Also the sort columns
113                // may not be in the output projection. To correct for these issues
114                // we need to project the ordering based on the output schema.
115                let schema = Arc::new(self.schema.project(p)?);
116                LexOrdering::new(original_sort_exprs)
117                    .and_then(|lex_ordering| project_ordering(&lex_ordering, &schema))
118                    .map(|lex_ordering| lex_ordering.to_vec())
119                    .unwrap_or_default()
120            } else {
121                original_sort_exprs
122            }
123        } else {
124            vec![]
125        };
126
127        Ok(Arc::new(StreamingTableExec::try_new(
128            Arc::clone(&self.schema),
129            self.partitions.clone(),
130            projection,
131            LexOrdering::new(physical_sort),
132            self.infinite,
133            limit,
134        )?))
135    }
136}