Skip to main content

datafusion_catalog/
default_table_source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Default TableSource implementation used in DataFusion physical plans
19
20use std::borrow::Cow;
21use std::sync::Arc;
22
23use crate::TableProvider;
24
25use arrow::datatypes::SchemaRef;
26use datafusion_common::{Constraints, internal_err};
27use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType};
28
29/// Implements [`TableSource`] for a [`TableProvider`]
30///
31/// This structure adapts a [`TableProvider`] (a physical plan trait) to the
32/// [`TableSource`] (logical plan trait).
33///
34/// It is used so logical plans in the `datafusion_expr` crate do not have a
35/// direct dependency on physical plans, such as [`TableProvider`]s.
36pub struct DefaultTableSource {
37    /// table provider
38    pub table_provider: Arc<dyn TableProvider>,
39}
40
41impl DefaultTableSource {
42    /// Create a new DefaultTableSource to wrap a TableProvider
43    pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
44        Self { table_provider }
45    }
46}
47
48impl TableSource for DefaultTableSource {
49    /// Get a reference to the schema for this table
50    fn schema(&self) -> SchemaRef {
51        self.table_provider.schema()
52    }
53
54    /// Get a reference to applicable constraints, if any exists.
55    fn constraints(&self) -> Option<&Constraints> {
56        self.table_provider.constraints()
57    }
58
59    /// Get the type of this table for metadata/catalog purposes.
60    fn table_type(&self) -> TableType {
61        self.table_provider.table_type()
62    }
63
64    /// Tests whether the table provider can make use of any or all filter expressions
65    /// to optimize data retrieval.
66    fn supports_filters_pushdown(
67        &self,
68        filter: &[&Expr],
69    ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
70        self.table_provider.supports_filters_pushdown(filter)
71    }
72
73    fn get_logical_plan(&'_ self) -> Option<Cow<'_, datafusion_expr::LogicalPlan>> {
74        self.table_provider.get_logical_plan()
75    }
76
77    fn get_column_default(&self, column: &str) -> Option<&Expr> {
78        self.table_provider.get_column_default(column)
79    }
80}
81
82/// Wrap TableProvider in TableSource
83pub fn provider_as_source(
84    table_provider: Arc<dyn TableProvider>,
85) -> Arc<dyn TableSource> {
86    Arc::new(DefaultTableSource::new(table_provider))
87}
88
89/// Attempt to downcast a TableSource to DefaultTableSource and access the
90/// TableProvider. This will only work with a TableSource created by DataFusion.
91pub fn source_as_provider(
92    source: &Arc<dyn TableSource>,
93) -> datafusion_common::Result<Arc<dyn TableProvider>> {
94    match source.as_ref().downcast_ref::<DefaultTableSource>() {
95        Some(source) => Ok(Arc::clone(&source.table_provider)),
96        _ => internal_err!("TableSource was not DefaultTableSource"),
97    }
98}
99
100#[test]
101fn preserves_table_type() {
102    use async_trait::async_trait;
103    use datafusion_common::DataFusionError;
104
105    #[derive(Debug)]
106    struct TestTempTable;
107
108    #[async_trait]
109    impl TableProvider for TestTempTable {
110        fn table_type(&self) -> TableType {
111            TableType::Temporary
112        }
113
114        fn schema(&self) -> SchemaRef {
115            unimplemented!()
116        }
117
118        async fn scan(
119            &self,
120            _: &dyn crate::Session,
121            _: Option<&Vec<usize>>,
122            _: &[Expr],
123            _: Option<usize>,
124        ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>, DataFusionError>
125        {
126            unimplemented!()
127        }
128    }
129
130    let table_source = DefaultTableSource::new(Arc::new(TestTempTable));
131    assert_eq!(table_source.table_type(), TableType::Temporary);
132}