datafusion_catalog/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::session::Session;
24use arrow::datatypes::SchemaRef;
25use async_trait::async_trait;
26use datafusion_common::Result;
27use datafusion_common::{not_impl_err, Constraints, Statistics};
28use datafusion_expr::Expr;
29
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{
32    CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
33};
34use datafusion_physical_plan::ExecutionPlan;
35
36/// A table which can be queried and modified.
37///
38/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
39///
40/// [`TableProvider`] represents a source of data which can provide data as
41/// Apache Arrow [`RecordBatch`]es. Implementations of this trait provide
42/// important information for planning such as:
43///
44/// 1. [`Self::schema`]: The schema (columns and their types) of the table
45/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
46/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
47///
48/// [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
49/// [`CatalogProvider`]: super::CatalogProvider
50#[async_trait]
51pub trait TableProvider: Debug + Sync + Send {
52    /// Returns the table provider as [`Any`] so that it can be
53    /// downcast to a specific implementation.
54    fn as_any(&self) -> &dyn Any;
55
56    /// Get a reference to the schema for this table
57    fn schema(&self) -> SchemaRef;
58
59    /// Get a reference to the constraints of the table.
60    /// Returns:
61    /// - `None` for tables that do not support constraints.
62    /// - `Some(&Constraints)` for tables supporting constraints.
63    /// Therefore, a `Some(&Constraints::empty())` return value indicates that
64    /// this table supports constraints, but there are no constraints.
65    fn constraints(&self) -> Option<&Constraints> {
66        None
67    }
68
69    /// Get the type of this table for metadata/catalog purposes.
70    fn table_type(&self) -> TableType;
71
72    /// Get the create statement used to create this table, if available.
73    fn get_table_definition(&self) -> Option<&str> {
74        None
75    }
76
77    /// Get the [`LogicalPlan`] of this table, if available.
78    fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
79        None
80    }
81
82    /// Get the default value for a column, if available.
83    fn get_column_default(&self, _column: &str) -> Option<&Expr> {
84        None
85    }
86
87    /// Create an [`ExecutionPlan`] for scanning the table with optionally
88    /// specified `projection`, `filter` and `limit`, described below.
89    ///
90    /// The `ExecutionPlan` is responsible scanning the datasource's
91    /// partitions in a streaming, parallelized fashion.
92    ///
93    /// # Projection
94    ///
95    /// If specified, only a subset of columns should be returned, in the order
96    /// specified. The projection is a set of indexes of the fields in
97    /// [`Self::schema`].
98    ///
99    /// DataFusion provides the projection to scan only the columns actually
100    /// used in the query to improve performance, an optimization  called
101    /// "Projection Pushdown". Some datasources, such as Parquet, can use this
102    /// information to go significantly faster when only a subset of columns is
103    /// required.
104    ///
105    /// # Filters
106    ///
107    /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the
108    /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for
109    /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the
110    /// expressions are `AND`ed together).
111    ///
112    /// To enable filter pushdown you must override
113    /// [`Self::supports_filters_pushdown`] as the default implementation does
114    /// not and `filters` will be empty.
115    ///
116    /// DataFusion pushes filtering into the scans whenever possible
117    /// ("Filter Pushdown"), and depending on the format and the
118    /// implementation of the format, evaluating the predicate during the scan
119    /// can increase performance significantly.
120    ///
121    /// ## Note: Some columns may appear *only* in Filters
122    ///
123    /// In certain cases, a query may only use a certain column in a Filter that
124    /// has been completely pushed down to the scan. In this case, the
125    /// projection will not contain all the columns found in the filter
126    /// expressions.
127    ///
128    /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`,
129    ///
130    /// ```text
131    /// ┌────────────────────┐
132    /// │  Projection(t.a)   │
133    /// └────────────────────┘
134    ///            ▲
135    ///            │
136    ///            │
137    /// ┌────────────────────┐     Filter     ┌────────────────────┐   Projection    ┌────────────────────┐
138    /// │  Filter(t.b > 5)   │────Pushdown──▶ │  Projection(t.a)   │ ───Pushdown───▶ │  Projection(t.a)   │
139    /// └────────────────────┘                └────────────────────┘                 └────────────────────┘
140    ///            ▲                                     ▲                                      ▲
141    ///            │                                     │                                      │
142    ///            │                                     │                           ┌────────────────────┐
143    /// ┌────────────────────┐                ┌────────────────────┐                 │        Scan        │
144    /// │        Scan        │                │        Scan        │                 │  filter=(t.b > 5)  │
145    /// └────────────────────┘                │  filter=(t.b > 5)  │                 │  projection=(t.a)  │
146    ///                                       └────────────────────┘                 └────────────────────┘
147    ///
148    /// Initial Plan                  If `TableProviderFilterPushDown`           Projection pushdown notes that
149    ///                               returns true, filter pushdown              the scan only needs t.a
150    ///                               pushes the filter into the scan
151    ///                                                                          BUT internally evaluating the
152    ///                                                                          predicate still requires t.b
153    /// ```
154    ///
155    /// # Limit
156    ///
157    /// If `limit` is specified,  must only produce *at least* this many rows,
158    /// (though it may return more).  Like Projection Pushdown and Filter
159    /// Pushdown, DataFusion pushes `LIMIT`s  as far down in the plan as
160    /// possible, called "Limit Pushdown" as some sources can use this
161    /// information to improve their performance. Note that if there are any
162    /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is
163    /// because inexact filters do not guarantee that every filtered row is
164    /// removed, so applying the limit could lead to too few rows being available
165    /// to return as a final result.
166    async fn scan(
167        &self,
168        state: &dyn Session,
169        projection: Option<&Vec<usize>>,
170        filters: &[Expr],
171        limit: Option<usize>,
172    ) -> Result<Arc<dyn ExecutionPlan>>;
173
174    /// Specify if DataFusion should provide filter expressions to the
175    /// TableProvider to apply *during* the scan.
176    ///
177    /// Some TableProviders can evaluate filters more efficiently than the
178    /// `Filter` operator in DataFusion, for example by using an index.
179    ///
180    /// # Parameters and Return Value
181    ///
182    /// The return `Vec` must have one element for each element of the `filters`
183    /// argument. The value of each element indicates if the TableProvider can
184    /// apply the corresponding filter during the scan. The position in the return
185    /// value corresponds to the expression in the `filters` parameter.
186    ///
187    /// If the length of the resulting `Vec` does not match the `filters` input
188    /// an error will be thrown.
189    ///
190    /// Each element in the resulting `Vec` is one of the following:
191    /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter
192    /// during scan
193    /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan
194    ///
195    /// By default, this function returns [`Unsupported`] for all filters,
196    /// meaning no filters will be provided to [`Self::scan`].
197    ///
198    /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
199    /// [`Exact`]: TableProviderFilterPushDown::Exact
200    /// [`Inexact`]: TableProviderFilterPushDown::Inexact
201    /// # Example
202    ///
203    /// ```rust
204    /// # use std::any::Any;
205    /// # use std::sync::Arc;
206    /// # use arrow::datatypes::SchemaRef;
207    /// # use async_trait::async_trait;
208    /// # use datafusion_catalog::{TableProvider, Session};
209    /// # use datafusion_common::Result;
210    /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
211    /// # use datafusion_physical_plan::ExecutionPlan;
212    /// // Define a struct that implements the TableProvider trait
213    /// #[derive(Debug)]
214    /// struct TestDataSource {}
215    ///
216    /// #[async_trait]
217    /// impl TableProvider for TestDataSource {
218    /// # fn as_any(&self) -> &dyn Any { todo!() }
219    /// # fn schema(&self) -> SchemaRef { todo!() }
220    /// # fn table_type(&self) -> TableType { todo!() }
221    /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec<usize>>, f: &[Expr], l: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> {
222    ///         todo!()
223    /// # }
224    ///     // Override the supports_filters_pushdown to evaluate which expressions
225    ///     // to accept as pushdown predicates.
226    ///     fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
227    ///         // Process each filter
228    ///         let support: Vec<_> = filters.iter().map(|expr| {
229    ///           match expr {
230    ///             // This example only supports a between expr with a single column named "c1".
231    ///             Expr::Between(between_expr) => {
232    ///                 between_expr.expr
233    ///                 .try_as_col()
234    ///                 .map(|column| {
235    ///                     if column.name == "c1" {
236    ///                         TableProviderFilterPushDown::Exact
237    ///                     } else {
238    ///                         TableProviderFilterPushDown::Unsupported
239    ///                     }
240    ///                 })
241    ///                 // If there is no column in the expr set the filter to unsupported.
242    ///                 .unwrap_or(TableProviderFilterPushDown::Unsupported)
243    ///             }
244    ///             _ => {
245    ///                 // For all other cases return Unsupported.
246    ///                 TableProviderFilterPushDown::Unsupported
247    ///             }
248    ///         }
249    ///     }).collect();
250    ///     Ok(support)
251    ///     }
252    /// }
253    /// ```
254    fn supports_filters_pushdown(
255        &self,
256        filters: &[&Expr],
257    ) -> Result<Vec<TableProviderFilterPushDown>> {
258        Ok(vec![
259            TableProviderFilterPushDown::Unsupported;
260            filters.len()
261        ])
262    }
263
264    /// Get statistics for this table, if available
265    /// Although not presently used in mainline DataFusion, this allows implementation specific
266    /// behavior for downstream repositories, in conjunction with specialized optimizer rules to
267    /// perform operations such as re-ordering of joins.
268    fn statistics(&self) -> Option<Statistics> {
269        None
270    }
271
272    /// Return an [`ExecutionPlan`] to insert data into this table, if
273    /// supported.
274    ///
275    /// The returned plan should return a single row in a UInt64
276    /// column called "count" such as the following
277    ///
278    /// ```text
279    /// +-------+,
280    /// | count |,
281    /// +-------+,
282    /// | 6     |,
283    /// +-------+,
284    /// ```
285    ///
286    /// # See Also
287    ///
288    /// See [`DataSinkExec`] for the common pattern of inserting a
289    /// streams of `RecordBatch`es as files to an ObjectStore.
290    ///
291    /// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
292    async fn insert_into(
293        &self,
294        _state: &dyn Session,
295        _input: Arc<dyn ExecutionPlan>,
296        _insert_op: InsertOp,
297    ) -> Result<Arc<dyn ExecutionPlan>> {
298        not_impl_err!("Insert into not implemented for this table")
299    }
300}
301
302/// A factory which creates [`TableProvider`]s at runtime given a URL.
303///
304/// For example, this can be used to create a table "on the fly"
305/// from a directory of files only when that name is referenced.
306#[async_trait]
307pub trait TableProviderFactory: Debug + Sync + Send {
308    /// Create a TableProvider with the given url
309    async fn create(
310        &self,
311        state: &dyn Session,
312        cmd: &CreateExternalTable,
313    ) -> Result<Arc<dyn TableProvider>>;
314}
315
316/// A trait for table function implementations
317pub trait TableFunctionImpl: Debug + Sync + Send {
318    /// Create a table provider
319    fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
320}
321
322/// A table that uses a function to generate data
323#[derive(Debug)]
324pub struct TableFunction {
325    /// Name of the table function
326    name: String,
327    /// Function implementation
328    fun: Arc<dyn TableFunctionImpl>,
329}
330
331impl TableFunction {
332    /// Create a new table function
333    pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
334        Self { name, fun }
335    }
336
337    /// Get the name of the table function
338    pub fn name(&self) -> &str {
339        &self.name
340    }
341
342    /// Get the implementation of the table function
343    pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
344        &self.fun
345    }
346
347    /// Get the function implementation and generate a table
348    pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
349        self.fun.call(args)
350    }
351}