datafusion_catalog/
streaming.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A simplified [`TableProvider`] for streaming partitioned datasets
19
20use std::any::Any;
21use std::sync::Arc;
22
23use arrow::datatypes::SchemaRef;
24use async_trait::async_trait;
25
26use crate::Session;
27use crate::TableProvider;
28use datafusion_common::{plan_err, Result};
29use datafusion_expr::{Expr, TableType};
30use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
31use datafusion_physical_plan::ExecutionPlan;
32use log::debug;
33
34/// A [`TableProvider`] that streams a set of [`PartitionStream`]
35#[derive(Debug)]
36pub struct StreamingTable {
37    schema: SchemaRef,
38    partitions: Vec<Arc<dyn PartitionStream>>,
39    infinite: bool,
40}
41
42impl StreamingTable {
43    /// Try to create a new [`StreamingTable`] returning an error if the schema is incorrect
44    pub fn try_new(
45        schema: SchemaRef,
46        partitions: Vec<Arc<dyn PartitionStream>>,
47    ) -> Result<Self> {
48        for x in partitions.iter() {
49            let partition_schema = x.schema();
50            if !schema.contains(partition_schema) {
51                debug!(
52                    "target schema does not contain partition schema. \
53                        Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
54                );
55                return plan_err!("Mismatch between schema and batches");
56            }
57        }
58
59        Ok(Self {
60            schema,
61            partitions,
62            infinite: false,
63        })
64    }
65    /// Sets streaming table can be infinite.
66    pub fn with_infinite_table(mut self, infinite: bool) -> Self {
67        self.infinite = infinite;
68        self
69    }
70}
71
72#[async_trait]
73impl TableProvider for StreamingTable {
74    fn as_any(&self) -> &dyn Any {
75        self
76    }
77
78    fn schema(&self) -> SchemaRef {
79        Arc::clone(&self.schema)
80    }
81
82    fn table_type(&self) -> TableType {
83        TableType::View
84    }
85
86    async fn scan(
87        &self,
88        _state: &dyn Session,
89        projection: Option<&Vec<usize>>,
90        _filters: &[Expr],
91        limit: Option<usize>,
92    ) -> Result<Arc<dyn ExecutionPlan>> {
93        Ok(Arc::new(StreamingTableExec::try_new(
94            Arc::clone(&self.schema),
95            self.partitions.clone(),
96            projection,
97            None,
98            self.infinite,
99            limit,
100        )?))
101    }
102}