datafusion_catalog/
default_table_source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Default TableSource implementation used in DataFusion physical plans
19
20use std::sync::Arc;
21use std::{any::Any, borrow::Cow};
22
23use crate::TableProvider;
24
25use arrow::datatypes::SchemaRef;
26use datafusion_common::{internal_err, Constraints};
27use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType};
28
29/// Implements [`TableSource`] for a [`TableProvider`]
30///
31/// This structure adapts a [`TableProvider`] (a physical plan trait) to the
32/// [`TableSource`] (logical plan trait).
33///
34/// It is used so logical plans in the `datafusion_expr` crate do not have a
35/// direct dependency on physical plans, such as [`TableProvider`]s.
36///
37/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html
38pub struct DefaultTableSource {
39    /// table provider
40    pub table_provider: Arc<dyn TableProvider>,
41}
42
43impl DefaultTableSource {
44    /// Create a new DefaultTableSource to wrap a TableProvider
45    pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
46        Self { table_provider }
47    }
48}
49
50impl TableSource for DefaultTableSource {
51    /// Returns the table source as [`Any`] so that it can be
52    /// downcast to a specific implementation.
53    fn as_any(&self) -> &dyn Any {
54        self
55    }
56
57    /// Get a reference to the schema for this table
58    fn schema(&self) -> SchemaRef {
59        self.table_provider.schema()
60    }
61
62    /// Get a reference to applicable constraints, if any exists.
63    fn constraints(&self) -> Option<&Constraints> {
64        self.table_provider.constraints()
65    }
66
67    /// Get the type of this table for metadata/catalog purposes.
68    fn table_type(&self) -> TableType {
69        self.table_provider.table_type()
70    }
71
72    /// Tests whether the table provider can make use of any or all filter expressions
73    /// to optimize data retrieval.
74    fn supports_filters_pushdown(
75        &self,
76        filter: &[&Expr],
77    ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
78        self.table_provider.supports_filters_pushdown(filter)
79    }
80
81    fn get_logical_plan(&self) -> Option<Cow<datafusion_expr::LogicalPlan>> {
82        self.table_provider.get_logical_plan()
83    }
84
85    fn get_column_default(&self, column: &str) -> Option<&Expr> {
86        self.table_provider.get_column_default(column)
87    }
88}
89
90/// Wrap TableProvider in TableSource
91pub fn provider_as_source(
92    table_provider: Arc<dyn TableProvider>,
93) -> Arc<dyn TableSource> {
94    Arc::new(DefaultTableSource::new(table_provider))
95}
96
97/// Attempt to downcast a TableSource to DefaultTableSource and access the
98/// TableProvider. This will only work with a TableSource created by DataFusion.
99pub fn source_as_provider(
100    source: &Arc<dyn TableSource>,
101) -> datafusion_common::Result<Arc<dyn TableProvider>> {
102    match source
103        .as_ref()
104        .as_any()
105        .downcast_ref::<DefaultTableSource>()
106    {
107        Some(source) => Ok(Arc::clone(&source.table_provider)),
108        _ => internal_err!("TableSource was not DefaultTableSource"),
109    }
110}
111
112#[test]
113fn preserves_table_type() {
114    use async_trait::async_trait;
115    use datafusion_common::DataFusionError;
116
117    #[derive(Debug)]
118    struct TestTempTable;
119
120    #[async_trait]
121    impl TableProvider for TestTempTable {
122        fn as_any(&self) -> &dyn Any {
123            self
124        }
125
126        fn table_type(&self) -> TableType {
127            TableType::Temporary
128        }
129
130        fn schema(&self) -> SchemaRef {
131            unimplemented!()
132        }
133
134        async fn scan(
135            &self,
136            _: &dyn crate::Session,
137            _: Option<&Vec<usize>>,
138            _: &[Expr],
139            _: Option<usize>,
140        ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>, DataFusionError>
141        {
142            unimplemented!()
143        }
144    }
145
146    let table_source = DefaultTableSource::new(Arc::new(TestTempTable));
147    assert_eq!(table_source.table_type(), TableType::Temporary);
148}