datafusion_catalog/
default_table_source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Default TableSource implementation used in DataFusion physical plans
19
20use std::sync::Arc;
21use std::{any::Any, borrow::Cow};
22
23use crate::TableProvider;
24
25use arrow::datatypes::SchemaRef;
26use datafusion_common::{internal_err, Constraints};
27use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType};
28
29/// Implements [`TableSource`] for a [`TableProvider`]
30///
31/// This structure adapts a [`TableProvider`] (a physical plan trait) to the
32/// [`TableSource`] (logical plan trait).
33///
34/// It is used so logical plans in the `datafusion_expr` crate do not have a
35/// direct dependency on physical plans, such as [`TableProvider`]s.
36pub struct DefaultTableSource {
37    /// table provider
38    pub table_provider: Arc<dyn TableProvider>,
39}
40
41impl DefaultTableSource {
42    /// Create a new DefaultTableSource to wrap a TableProvider
43    pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
44        Self { table_provider }
45    }
46}
47
48impl TableSource for DefaultTableSource {
49    /// Returns the table source as [`Any`] so that it can be
50    /// downcast to a specific implementation.
51    fn as_any(&self) -> &dyn Any {
52        self
53    }
54
55    /// Get a reference to the schema for this table
56    fn schema(&self) -> SchemaRef {
57        self.table_provider.schema()
58    }
59
60    /// Get a reference to applicable constraints, if any exists.
61    fn constraints(&self) -> Option<&Constraints> {
62        self.table_provider.constraints()
63    }
64
65    /// Get the type of this table for metadata/catalog purposes.
66    fn table_type(&self) -> TableType {
67        self.table_provider.table_type()
68    }
69
70    /// Tests whether the table provider can make use of any or all filter expressions
71    /// to optimize data retrieval.
72    fn supports_filters_pushdown(
73        &self,
74        filter: &[&Expr],
75    ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
76        self.table_provider.supports_filters_pushdown(filter)
77    }
78
79    fn get_logical_plan(&'_ self) -> Option<Cow<'_, datafusion_expr::LogicalPlan>> {
80        self.table_provider.get_logical_plan()
81    }
82
83    fn get_column_default(&self, column: &str) -> Option<&Expr> {
84        self.table_provider.get_column_default(column)
85    }
86}
87
88/// Wrap TableProvider in TableSource
89pub fn provider_as_source(
90    table_provider: Arc<dyn TableProvider>,
91) -> Arc<dyn TableSource> {
92    Arc::new(DefaultTableSource::new(table_provider))
93}
94
95/// Attempt to downcast a TableSource to DefaultTableSource and access the
96/// TableProvider. This will only work with a TableSource created by DataFusion.
97pub fn source_as_provider(
98    source: &Arc<dyn TableSource>,
99) -> datafusion_common::Result<Arc<dyn TableProvider>> {
100    match source
101        .as_ref()
102        .as_any()
103        .downcast_ref::<DefaultTableSource>()
104    {
105        Some(source) => Ok(Arc::clone(&source.table_provider)),
106        _ => internal_err!("TableSource was not DefaultTableSource"),
107    }
108}
109
110#[test]
111fn preserves_table_type() {
112    use async_trait::async_trait;
113    use datafusion_common::DataFusionError;
114
115    #[derive(Debug)]
116    struct TestTempTable;
117
118    #[async_trait]
119    impl TableProvider for TestTempTable {
120        fn as_any(&self) -> &dyn Any {
121            self
122        }
123
124        fn table_type(&self) -> TableType {
125            TableType::Temporary
126        }
127
128        fn schema(&self) -> SchemaRef {
129            unimplemented!()
130        }
131
132        async fn scan(
133            &self,
134            _: &dyn crate::Session,
135            _: Option<&Vec<usize>>,
136            _: &[Expr],
137            _: Option<usize>,
138        ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>, DataFusionError>
139        {
140            unimplemented!()
141        }
142    }
143
144    let table_source = DefaultTableSource::new(Arc::new(TestTempTable));
145    assert_eq!(table_source.table_type(), TableType::Temporary);
146}