datafusion_catalog/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::session::Session;
24use arrow::datatypes::SchemaRef;
25use async_trait::async_trait;
26use datafusion_common::Result;
27use datafusion_common::{not_impl_err, Constraints, Statistics};
28use datafusion_expr::Expr;
29
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{
32    CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
33};
34use datafusion_physical_plan::ExecutionPlan;
35
36/// A table which can be queried and modified.
37///
38/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
39///
40/// [`TableProvider`] represents a source of data which can provide data as
41/// Apache Arrow [`RecordBatch`]es. Implementations of this trait provide
42/// important information for planning such as:
43///
44/// 1. [`Self::schema`]: The schema (columns and their types) of the table
45/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
46/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
47///
48/// [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
49/// [`CatalogProvider`]: super::CatalogProvider
50#[async_trait]
51pub trait TableProvider: Debug + Sync + Send {
52    /// Returns the table provider as [`Any`] so that it can be
53    /// downcast to a specific implementation.
54    fn as_any(&self) -> &dyn Any;
55
56    /// Get a reference to the schema for this table
57    fn schema(&self) -> SchemaRef;
58
59    /// Get a reference to the constraints of the table.
60    /// Returns:
61    /// - `None` for tables that do not support constraints.
62    /// - `Some(&Constraints)` for tables supporting constraints.
63    /// Therefore, a `Some(&Constraints::empty())` return value indicates that
64    /// this table supports constraints, but there are no constraints.
65    fn constraints(&self) -> Option<&Constraints> {
66        None
67    }
68
69    /// Get the type of this table for metadata/catalog purposes.
70    fn table_type(&self) -> TableType;
71
72    /// Get the create statement used to create this table, if available.
73    fn get_table_definition(&self) -> Option<&str> {
74        None
75    }
76
77    /// Get the [`LogicalPlan`] of this table, if available.
78    fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
79        None
80    }
81
82    /// Get the default value for a column, if available.
83    fn get_column_default(&self, _column: &str) -> Option<&Expr> {
84        None
85    }
86
87    /// Create an [`ExecutionPlan`] for scanning the table with optionally
88    /// specified `projection`, `filter` and `limit`, described below.
89    ///
90    /// The `ExecutionPlan` is responsible scanning the datasource's
91    /// partitions in a streaming, parallelized fashion.
92    ///
93    /// # Projection
94    ///
95    /// If specified, only a subset of columns should be returned, in the order
96    /// specified. The projection is a set of indexes of the fields in
97    /// [`Self::schema`].
98    ///
99    /// DataFusion provides the projection to scan only the columns actually
100    /// used in the query to improve performance, an optimization  called
101    /// "Projection Pushdown". Some datasources, such as Parquet, can use this
102    /// information to go significantly faster when only a subset of columns is
103    /// required.
104    ///
105    /// # Filters
106    ///
107    /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the
108    /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for
109    /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the
110    /// expressions are `AND`ed together).
111    ///
112    /// To enable filter pushdown you must override
113    /// [`Self::supports_filters_pushdown`] as the default implementation does
114    /// not and `filters` will be empty.
115    ///
116    /// DataFusion pushes filtering into the scans whenever possible
117    /// ("Filter Pushdown"), and depending on the format and the
118    /// implementation of the format, evaluating the predicate during the scan
119    /// can increase performance significantly.
120    ///
121    /// ## Note: Some columns may appear *only* in Filters
122    ///
123    /// In certain cases, a query may only use a certain column in a Filter that
124    /// has been completely pushed down to the scan. In this case, the
125    /// projection will not contain all the columns found in the filter
126    /// expressions.
127    ///
128    /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`,
129    ///
130    /// ```text
131    /// ┌────────────────────┐
132    /// │  Projection(t.a)   │
133    /// └────────────────────┘
134    ///            ▲
135    ///            │
136    ///            │
137    /// ┌────────────────────┐     Filter     ┌────────────────────┐   Projection    ┌────────────────────┐
138    /// │  Filter(t.b > 5)   │────Pushdown──▶ │  Projection(t.a)   │ ───Pushdown───▶ │  Projection(t.a)   │
139    /// └────────────────────┘                └────────────────────┘                 └────────────────────┘
140    ///            ▲                                     ▲                                      ▲
141    ///            │                                     │                                      │
142    ///            │                                     │                           ┌────────────────────┐
143    /// ┌────────────────────┐                ┌────────────────────┐                 │        Scan        │
144    /// │        Scan        │                │        Scan        │                 │  filter=(t.b > 5)  │
145    /// └────────────────────┘                │  filter=(t.b > 5)  │                 │  projection=(t.a)  │
146    ///                                       └────────────────────┘                 └────────────────────┘
147    ///
148    /// Initial Plan                  If `TableProviderFilterPushDown`           Projection pushdown notes that
149    ///                               returns true, filter pushdown              the scan only needs t.a
150    ///                               pushes the filter into the scan
151    ///                                                                          BUT internally evaluating the
152    ///                                                                          predicate still requires t.b
153    /// ```
154    ///
155    /// # Limit
156    ///
157    /// If `limit` is specified,  must only produce *at least* this many rows,
158    /// (though it may return more).  Like Projection Pushdown and Filter
159    /// Pushdown, DataFusion pushes `LIMIT`s  as far down in the plan as
160    /// possible, called "Limit Pushdown" as some sources can use this
161    /// information to improve their performance. Note that if there are any
162    /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is
163    /// because inexact filters do not guarantee that every filtered row is
164    /// removed, so applying the limit could lead to too few rows being available
165    /// to return as a final result.
166    async fn scan(
167        &self,
168        state: &dyn Session,
169        projection: Option<&Vec<usize>>,
170        filters: &[Expr],
171        limit: Option<usize>,
172    ) -> Result<Arc<dyn ExecutionPlan>>;
173
174    /// Create an [`ExecutionPlan`] for scanning the table using structured arguments.
175    ///
176    /// This method uses [`ScanArgs`] to pass scan parameters in a structured way
177    /// and returns a [`ScanResult`] containing the execution plan.
178    ///
179    /// Table providers can override this method to take advantage of additional
180    /// parameters like the upcoming `preferred_ordering` that may not be available through
181    /// other scan methods.
182    ///
183    /// # Arguments
184    /// * `state` - The session state containing configuration and context
185    /// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences
186    ///
187    /// # Returns
188    /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table
189    ///
190    /// See [`Self::scan`] for detailed documentation about projection, filters, and limits.
191    async fn scan_with_args<'a>(
192        &self,
193        state: &dyn Session,
194        args: ScanArgs<'a>,
195    ) -> Result<ScanResult> {
196        let filters = args.filters().unwrap_or(&[]);
197        let projection = args.projection().map(|p| p.to_vec());
198        let limit = args.limit();
199        let plan = self
200            .scan(state, projection.as_ref(), filters, limit)
201            .await?;
202        Ok(plan.into())
203    }
204
205    /// Specify if DataFusion should provide filter expressions to the
206    /// TableProvider to apply *during* the scan.
207    ///
208    /// Some TableProviders can evaluate filters more efficiently than the
209    /// `Filter` operator in DataFusion, for example by using an index.
210    ///
211    /// # Parameters and Return Value
212    ///
213    /// The return `Vec` must have one element for each element of the `filters`
214    /// argument. The value of each element indicates if the TableProvider can
215    /// apply the corresponding filter during the scan. The position in the return
216    /// value corresponds to the expression in the `filters` parameter.
217    ///
218    /// If the length of the resulting `Vec` does not match the `filters` input
219    /// an error will be thrown.
220    ///
221    /// Each element in the resulting `Vec` is one of the following:
222    /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter
223    /// during scan
224    /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan
225    ///
226    /// By default, this function returns [`Unsupported`] for all filters,
227    /// meaning no filters will be provided to [`Self::scan`].
228    ///
229    /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
230    /// [`Exact`]: TableProviderFilterPushDown::Exact
231    /// [`Inexact`]: TableProviderFilterPushDown::Inexact
232    /// # Example
233    ///
234    /// ```rust
235    /// # use std::any::Any;
236    /// # use std::sync::Arc;
237    /// # use arrow::datatypes::SchemaRef;
238    /// # use async_trait::async_trait;
239    /// # use datafusion_catalog::{TableProvider, Session};
240    /// # use datafusion_common::Result;
241    /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
242    /// # use datafusion_physical_plan::ExecutionPlan;
243    /// // Define a struct that implements the TableProvider trait
244    /// #[derive(Debug)]
245    /// struct TestDataSource {}
246    ///
247    /// #[async_trait]
248    /// impl TableProvider for TestDataSource {
249    /// # fn as_any(&self) -> &dyn Any { todo!() }
250    /// # fn schema(&self) -> SchemaRef { todo!() }
251    /// # fn table_type(&self) -> TableType { todo!() }
252    /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec<usize>>, f: &[Expr], l: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> {
253    ///         todo!()
254    /// # }
255    ///     // Override the supports_filters_pushdown to evaluate which expressions
256    ///     // to accept as pushdown predicates.
257    ///     fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
258    ///         // Process each filter
259    ///         let support: Vec<_> = filters.iter().map(|expr| {
260    ///           match expr {
261    ///             // This example only supports a between expr with a single column named "c1".
262    ///             Expr::Between(between_expr) => {
263    ///                 between_expr.expr
264    ///                 .try_as_col()
265    ///                 .map(|column| {
266    ///                     if column.name == "c1" {
267    ///                         TableProviderFilterPushDown::Exact
268    ///                     } else {
269    ///                         TableProviderFilterPushDown::Unsupported
270    ///                     }
271    ///                 })
272    ///                 // If there is no column in the expr set the filter to unsupported.
273    ///                 .unwrap_or(TableProviderFilterPushDown::Unsupported)
274    ///             }
275    ///             _ => {
276    ///                 // For all other cases return Unsupported.
277    ///                 TableProviderFilterPushDown::Unsupported
278    ///             }
279    ///         }
280    ///     }).collect();
281    ///     Ok(support)
282    ///     }
283    /// }
284    /// ```
285    fn supports_filters_pushdown(
286        &self,
287        filters: &[&Expr],
288    ) -> Result<Vec<TableProviderFilterPushDown>> {
289        Ok(vec![
290            TableProviderFilterPushDown::Unsupported;
291            filters.len()
292        ])
293    }
294
295    /// Get statistics for this table, if available
296    /// Although not presently used in mainline DataFusion, this allows implementation specific
297    /// behavior for downstream repositories, in conjunction with specialized optimizer rules to
298    /// perform operations such as re-ordering of joins.
299    fn statistics(&self) -> Option<Statistics> {
300        None
301    }
302
303    /// Return an [`ExecutionPlan`] to insert data into this table, if
304    /// supported.
305    ///
306    /// The returned plan should return a single row in a UInt64
307    /// column called "count" such as the following
308    ///
309    /// ```text
310    /// +-------+,
311    /// | count |,
312    /// +-------+,
313    /// | 6     |,
314    /// +-------+,
315    /// ```
316    ///
317    /// # See Also
318    ///
319    /// See [`DataSinkExec`] for the common pattern of inserting a
320    /// streams of `RecordBatch`es as files to an ObjectStore.
321    ///
322    /// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
323    async fn insert_into(
324        &self,
325        _state: &dyn Session,
326        _input: Arc<dyn ExecutionPlan>,
327        _insert_op: InsertOp,
328    ) -> Result<Arc<dyn ExecutionPlan>> {
329        not_impl_err!("Insert into not implemented for this table")
330    }
331}
332
333/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
334#[derive(Debug, Clone, Default)]
335pub struct ScanArgs<'a> {
336    filters: Option<&'a [Expr]>,
337    projection: Option<&'a [usize]>,
338    limit: Option<usize>,
339}
340
341impl<'a> ScanArgs<'a> {
342    /// Set the column projection for the scan.
343    ///
344    /// The projection is a list of column indices from [`TableProvider::schema`]
345    /// that should be included in the scan results. If `None`, all columns are included.
346    ///
347    /// # Arguments
348    /// * `projection` - Optional slice of column indices to project
349    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
350        self.projection = projection;
351        self
352    }
353
354    /// Get the column projection for the scan.
355    ///
356    /// Returns a reference to the projection column indices, or `None` if
357    /// no projection was specified (meaning all columns should be included).
358    pub fn projection(&self) -> Option<&'a [usize]> {
359        self.projection
360    }
361
362    /// Set the filter expressions for the scan.
363    ///
364    /// Filters are boolean expressions that should be evaluated during the scan
365    /// to reduce the number of rows returned. All expressions are combined with AND logic.
366    /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`].
367    ///
368    /// # Arguments
369    /// * `filters` - Optional slice of filter expressions
370    pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
371        self.filters = filters;
372        self
373    }
374
375    /// Get the filter expressions for the scan.
376    ///
377    /// Returns a reference to the filter expressions, or `None` if no filters were specified.
378    pub fn filters(&self) -> Option<&'a [Expr]> {
379        self.filters
380    }
381
382    /// Set the maximum number of rows to return from the scan.
383    ///
384    /// If specified, the scan should return at most this many rows. This is typically
385    /// used to optimize queries with `LIMIT` clauses.
386    ///
387    /// # Arguments
388    /// * `limit` - Optional maximum number of rows to return
389    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
390        self.limit = limit;
391        self
392    }
393
394    /// Get the maximum number of rows to return from the scan.
395    ///
396    /// Returns the row limit, or `None` if no limit was specified.
397    pub fn limit(&self) -> Option<usize> {
398        self.limit
399    }
400}
401
402/// Result of a table scan operation from [`TableProvider::scan_with_args`].
403#[derive(Debug, Clone)]
404pub struct ScanResult {
405    /// The ExecutionPlan to run.
406    plan: Arc<dyn ExecutionPlan>,
407}
408
409impl ScanResult {
410    /// Create a new `ScanResult` with the given execution plan.
411    ///
412    /// # Arguments
413    /// * `plan` - The execution plan that will perform the table scan
414    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
415        Self { plan }
416    }
417
418    /// Get a reference to the execution plan for this scan result.
419    ///
420    /// Returns a reference to the [`ExecutionPlan`] that will perform
421    /// the actual table scanning and data retrieval.
422    pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
423        &self.plan
424    }
425
426    /// Consume this ScanResult and return the execution plan.
427    ///
428    /// Returns the owned [`ExecutionPlan`] that will perform
429    /// the actual table scanning and data retrieval.
430    pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
431        self.plan
432    }
433}
434
435impl From<Arc<dyn ExecutionPlan>> for ScanResult {
436    fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
437        Self::new(plan)
438    }
439}
440
441/// A factory which creates [`TableProvider`]s at runtime given a URL.
442///
443/// For example, this can be used to create a table "on the fly"
444/// from a directory of files only when that name is referenced.
445#[async_trait]
446pub trait TableProviderFactory: Debug + Sync + Send {
447    /// Create a TableProvider with the given url
448    async fn create(
449        &self,
450        state: &dyn Session,
451        cmd: &CreateExternalTable,
452    ) -> Result<Arc<dyn TableProvider>>;
453}
454
455/// A trait for table function implementations
456pub trait TableFunctionImpl: Debug + Sync + Send {
457    /// Create a table provider
458    fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
459}
460
461/// A table that uses a function to generate data
462#[derive(Clone, Debug)]
463pub struct TableFunction {
464    /// Name of the table function
465    name: String,
466    /// Function implementation
467    fun: Arc<dyn TableFunctionImpl>,
468}
469
470impl TableFunction {
471    /// Create a new table function
472    pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
473        Self { name, fun }
474    }
475
476    /// Get the name of the table function
477    pub fn name(&self) -> &str {
478        &self.name
479    }
480
481    /// Get the implementation of the table function
482    pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
483        &self.fun
484    }
485
486    /// Get the function implementation and generate a table
487    pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
488        self.fun.call(args)
489    }
490}