datafusion_catalog/table.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::session::Session;
24use arrow::datatypes::SchemaRef;
25use async_trait::async_trait;
26use datafusion_common::Result;
27use datafusion_common::{not_impl_err, Constraints, Statistics};
28use datafusion_expr::Expr;
29
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{
32 CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
33};
34use datafusion_physical_plan::ExecutionPlan;
35
36/// A table which can be queried and modified.
37///
38/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
39///
40/// [`TableProvider`] represents a source of data which can provide data as
41/// Apache Arrow [`RecordBatch`]es. Implementations of this trait provide
42/// important information for planning such as:
43///
44/// 1. [`Self::schema`]: The schema (columns and their types) of the table
45/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
46/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
47///
48/// [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
49/// [`CatalogProvider`]: super::CatalogProvider
50#[async_trait]
51pub trait TableProvider: Debug + Sync + Send {
52 /// Returns the table provider as [`Any`] so that it can be
53 /// downcast to a specific implementation.
54 fn as_any(&self) -> &dyn Any;
55
56 /// Get a reference to the schema for this table
57 fn schema(&self) -> SchemaRef;
58
59 /// Get a reference to the constraints of the table.
60 /// Returns:
61 /// - `None` for tables that do not support constraints.
62 /// - `Some(&Constraints)` for tables supporting constraints.
63 /// Therefore, a `Some(&Constraints::empty())` return value indicates that
64 /// this table supports constraints, but there are no constraints.
65 fn constraints(&self) -> Option<&Constraints> {
66 None
67 }
68
69 /// Get the type of this table for metadata/catalog purposes.
70 fn table_type(&self) -> TableType;
71
72 /// Get the create statement used to create this table, if available.
73 fn get_table_definition(&self) -> Option<&str> {
74 None
75 }
76
77 /// Get the [`LogicalPlan`] of this table, if available.
78 fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
79 None
80 }
81
82 /// Get the default value for a column, if available.
83 fn get_column_default(&self, _column: &str) -> Option<&Expr> {
84 None
85 }
86
87 /// Create an [`ExecutionPlan`] for scanning the table with optionally
88 /// specified `projection`, `filter` and `limit`, described below.
89 ///
90 /// The `ExecutionPlan` is responsible scanning the datasource's
91 /// partitions in a streaming, parallelized fashion.
92 ///
93 /// # Projection
94 ///
95 /// If specified, only a subset of columns should be returned, in the order
96 /// specified. The projection is a set of indexes of the fields in
97 /// [`Self::schema`].
98 ///
99 /// DataFusion provides the projection to scan only the columns actually
100 /// used in the query to improve performance, an optimization called
101 /// "Projection Pushdown". Some datasources, such as Parquet, can use this
102 /// information to go significantly faster when only a subset of columns is
103 /// required.
104 ///
105 /// # Filters
106 ///
107 /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the
108 /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for
109 /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the
110 /// expressions are `AND`ed together).
111 ///
112 /// To enable filter pushdown you must override
113 /// [`Self::supports_filters_pushdown`] as the default implementation does
114 /// not and `filters` will be empty.
115 ///
116 /// DataFusion pushes filtering into the scans whenever possible
117 /// ("Filter Pushdown"), and depending on the format and the
118 /// implementation of the format, evaluating the predicate during the scan
119 /// can increase performance significantly.
120 ///
121 /// ## Note: Some columns may appear *only* in Filters
122 ///
123 /// In certain cases, a query may only use a certain column in a Filter that
124 /// has been completely pushed down to the scan. In this case, the
125 /// projection will not contain all the columns found in the filter
126 /// expressions.
127 ///
128 /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`,
129 ///
130 /// ```text
131 /// ┌────────────────────┐
132 /// │ Projection(t.a) │
133 /// └────────────────────┘
134 /// ▲
135 /// │
136 /// │
137 /// ┌────────────────────┐ Filter ┌────────────────────┐ Projection ┌────────────────────┐
138 /// │ Filter(t.b > 5) │────Pushdown──▶ │ Projection(t.a) │ ───Pushdown───▶ │ Projection(t.a) │
139 /// └────────────────────┘ └────────────────────┘ └────────────────────┘
140 /// ▲ ▲ ▲
141 /// │ │ │
142 /// │ │ ┌────────────────────┐
143 /// ┌────────────────────┐ ┌────────────────────┐ │ Scan │
144 /// │ Scan │ │ Scan │ │ filter=(t.b > 5) │
145 /// └────────────────────┘ │ filter=(t.b > 5) │ │ projection=(t.a) │
146 /// └────────────────────┘ └────────────────────┘
147 ///
148 /// Initial Plan If `TableProviderFilterPushDown` Projection pushdown notes that
149 /// returns true, filter pushdown the scan only needs t.a
150 /// pushes the filter into the scan
151 /// BUT internally evaluating the
152 /// predicate still requires t.b
153 /// ```
154 ///
155 /// # Limit
156 ///
157 /// If `limit` is specified, must only produce *at least* this many rows,
158 /// (though it may return more). Like Projection Pushdown and Filter
159 /// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as
160 /// possible, called "Limit Pushdown" as some sources can use this
161 /// information to improve their performance. Note that if there are any
162 /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is
163 /// because inexact filters do not guarantee that every filtered row is
164 /// removed, so applying the limit could lead to too few rows being available
165 /// to return as a final result.
166 async fn scan(
167 &self,
168 state: &dyn Session,
169 projection: Option<&Vec<usize>>,
170 filters: &[Expr],
171 limit: Option<usize>,
172 ) -> Result<Arc<dyn ExecutionPlan>>;
173
174 /// Specify if DataFusion should provide filter expressions to the
175 /// TableProvider to apply *during* the scan.
176 ///
177 /// Some TableProviders can evaluate filters more efficiently than the
178 /// `Filter` operator in DataFusion, for example by using an index.
179 ///
180 /// # Parameters and Return Value
181 ///
182 /// The return `Vec` must have one element for each element of the `filters`
183 /// argument. The value of each element indicates if the TableProvider can
184 /// apply the corresponding filter during the scan. The position in the return
185 /// value corresponds to the expression in the `filters` parameter.
186 ///
187 /// If the length of the resulting `Vec` does not match the `filters` input
188 /// an error will be thrown.
189 ///
190 /// Each element in the resulting `Vec` is one of the following:
191 /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter
192 /// during scan
193 /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan
194 ///
195 /// By default, this function returns [`Unsupported`] for all filters,
196 /// meaning no filters will be provided to [`Self::scan`].
197 ///
198 /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
199 /// [`Exact`]: TableProviderFilterPushDown::Exact
200 /// [`Inexact`]: TableProviderFilterPushDown::Inexact
201 /// # Example
202 ///
203 /// ```rust
204 /// # use std::any::Any;
205 /// # use std::sync::Arc;
206 /// # use arrow::datatypes::SchemaRef;
207 /// # use async_trait::async_trait;
208 /// # use datafusion_catalog::{TableProvider, Session};
209 /// # use datafusion_common::Result;
210 /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
211 /// # use datafusion_physical_plan::ExecutionPlan;
212 /// // Define a struct that implements the TableProvider trait
213 /// #[derive(Debug)]
214 /// struct TestDataSource {}
215 ///
216 /// #[async_trait]
217 /// impl TableProvider for TestDataSource {
218 /// # fn as_any(&self) -> &dyn Any { todo!() }
219 /// # fn schema(&self) -> SchemaRef { todo!() }
220 /// # fn table_type(&self) -> TableType { todo!() }
221 /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec<usize>>, f: &[Expr], l: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> {
222 /// todo!()
223 /// # }
224 /// // Override the supports_filters_pushdown to evaluate which expressions
225 /// // to accept as pushdown predicates.
226 /// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
227 /// // Process each filter
228 /// let support: Vec<_> = filters.iter().map(|expr| {
229 /// match expr {
230 /// // This example only supports a between expr with a single column named "c1".
231 /// Expr::Between(between_expr) => {
232 /// between_expr.expr
233 /// .try_as_col()
234 /// .map(|column| {
235 /// if column.name == "c1" {
236 /// TableProviderFilterPushDown::Exact
237 /// } else {
238 /// TableProviderFilterPushDown::Unsupported
239 /// }
240 /// })
241 /// // If there is no column in the expr set the filter to unsupported.
242 /// .unwrap_or(TableProviderFilterPushDown::Unsupported)
243 /// }
244 /// _ => {
245 /// // For all other cases return Unsupported.
246 /// TableProviderFilterPushDown::Unsupported
247 /// }
248 /// }
249 /// }).collect();
250 /// Ok(support)
251 /// }
252 /// }
253 /// ```
254 fn supports_filters_pushdown(
255 &self,
256 filters: &[&Expr],
257 ) -> Result<Vec<TableProviderFilterPushDown>> {
258 Ok(vec![
259 TableProviderFilterPushDown::Unsupported;
260 filters.len()
261 ])
262 }
263
264 /// Get statistics for this table, if available
265 /// Although not presently used in mainline DataFusion, this allows implementation specific
266 /// behavior for downstream repositories, in conjunction with specialized optimizer rules to
267 /// perform operations such as re-ordering of joins.
268 fn statistics(&self) -> Option<Statistics> {
269 None
270 }
271
272 /// Return an [`ExecutionPlan`] to insert data into this table, if
273 /// supported.
274 ///
275 /// The returned plan should return a single row in a UInt64
276 /// column called "count" such as the following
277 ///
278 /// ```text
279 /// +-------+,
280 /// | count |,
281 /// +-------+,
282 /// | 6 |,
283 /// +-------+,
284 /// ```
285 ///
286 /// # See Also
287 ///
288 /// See [`DataSinkExec`] for the common pattern of inserting a
289 /// streams of `RecordBatch`es as files to an ObjectStore.
290 ///
291 /// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
292 async fn insert_into(
293 &self,
294 _state: &dyn Session,
295 _input: Arc<dyn ExecutionPlan>,
296 _insert_op: InsertOp,
297 ) -> Result<Arc<dyn ExecutionPlan>> {
298 not_impl_err!("Insert into not implemented for this table")
299 }
300}
301
302/// A factory which creates [`TableProvider`]s at runtime given a URL.
303///
304/// For example, this can be used to create a table "on the fly"
305/// from a directory of files only when that name is referenced.
306#[async_trait]
307pub trait TableProviderFactory: Debug + Sync + Send {
308 /// Create a TableProvider with the given url
309 async fn create(
310 &self,
311 state: &dyn Session,
312 cmd: &CreateExternalTable,
313 ) -> Result<Arc<dyn TableProvider>>;
314}
315
316/// A trait for table function implementations
317pub trait TableFunctionImpl: Debug + Sync + Send {
318 /// Create a table provider
319 fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
320}
321
322/// A table that uses a function to generate data
323#[derive(Debug)]
324pub struct TableFunction {
325 /// Name of the table function
326 name: String,
327 /// Function implementation
328 fun: Arc<dyn TableFunctionImpl>,
329}
330
331impl TableFunction {
332 /// Create a new table function
333 pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
334 Self { name, fun }
335 }
336
337 /// Get the name of the table function
338 pub fn name(&self) -> &str {
339 &self.name
340 }
341
342 /// Get the implementation of the table function
343 pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
344 &self.fun
345 }
346
347 /// Get the function implementation and generate a table
348 pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
349 self.fun.call(args)
350 }
351}