Skip to main content

datafusion_catalog/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::session::Session;
24use arrow::datatypes::SchemaRef;
25use async_trait::async_trait;
26use datafusion_common::Result;
27use datafusion_common::{Constraints, Statistics, not_impl_err};
28use datafusion_expr::Expr;
29
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{
32    CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
33};
34use datafusion_physical_plan::ExecutionPlan;
35
36/// A table which can be queried and modified.
37///
38/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
39///
40/// [`TableProvider`] represents a source of data which can provide data as
41/// Apache Arrow [`RecordBatch`]es. Implementations of this trait provide
42/// important information for planning such as:
43///
44/// 1. [`Self::schema`]: The schema (columns and their types) of the table
45/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
46/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
47///
48/// [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
49/// [`CatalogProvider`]: super::CatalogProvider
50#[async_trait]
51pub trait TableProvider: Debug + Sync + Send {
52    /// Returns the table provider as [`Any`] so that it can be
53    /// downcast to a specific implementation.
54    fn as_any(&self) -> &dyn Any;
55
56    /// Get a reference to the schema for this table
57    fn schema(&self) -> SchemaRef;
58
59    /// Get a reference to the constraints of the table.
60    /// Returns:
61    /// - `None` for tables that do not support constraints.
62    /// - `Some(&Constraints)` for tables supporting constraints.
63    /// Therefore, a `Some(&Constraints::empty())` return value indicates that
64    /// this table supports constraints, but there are no constraints.
65    fn constraints(&self) -> Option<&Constraints> {
66        None
67    }
68
69    /// Get the type of this table for metadata/catalog purposes.
70    fn table_type(&self) -> TableType;
71
72    /// Get the create statement used to create this table, if available.
73    fn get_table_definition(&self) -> Option<&str> {
74        None
75    }
76
77    /// Get the [`LogicalPlan`] of this table, if available.
78    fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
79        None
80    }
81
82    /// Get the default value for a column, if available.
83    fn get_column_default(&self, _column: &str) -> Option<&Expr> {
84        None
85    }
86
87    /// Create an [`ExecutionPlan`] for scanning the table with optionally
88    /// specified `projection`, `filter` and `limit`, described below.
89    ///
90    /// The `ExecutionPlan` is responsible scanning the datasource's
91    /// partitions in a streaming, parallelized fashion.
92    ///
93    /// # Projection
94    ///
95    /// If specified, only a subset of columns should be returned, in the order
96    /// specified. The projection is a set of indexes of the fields in
97    /// [`Self::schema`].
98    ///
99    /// DataFusion provides the projection to scan only the columns actually
100    /// used in the query to improve performance, an optimization  called
101    /// "Projection Pushdown". Some datasources, such as Parquet, can use this
102    /// information to go significantly faster when only a subset of columns is
103    /// required.
104    ///
105    /// # Filters
106    ///
107    /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the
108    /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for
109    /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the
110    /// expressions are `AND`ed together).
111    ///
112    /// To enable filter pushdown you must override
113    /// [`Self::supports_filters_pushdown`] as the default implementation does
114    /// not and `filters` will be empty.
115    ///
116    /// DataFusion pushes filtering into the scans whenever possible
117    /// ("Filter Pushdown"), and depending on the format and the
118    /// implementation of the format, evaluating the predicate during the scan
119    /// can increase performance significantly.
120    ///
121    /// ## Note: Some columns may appear *only* in Filters
122    ///
123    /// In certain cases, a query may only use a certain column in a Filter that
124    /// has been completely pushed down to the scan. In this case, the
125    /// projection will not contain all the columns found in the filter
126    /// expressions.
127    ///
128    /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`,
129    ///
130    /// ```text
131    /// ┌────────────────────┐
132    /// │  Projection(t.a)   │
133    /// └────────────────────┘
134    ///            ▲
135    ///            │
136    ///            │
137    /// ┌────────────────────┐     Filter     ┌────────────────────┐   Projection    ┌────────────────────┐
138    /// │  Filter(t.b > 5)   │────Pushdown──▶ │  Projection(t.a)   │ ───Pushdown───▶ │  Projection(t.a)   │
139    /// └────────────────────┘                └────────────────────┘                 └────────────────────┘
140    ///            ▲                                     ▲                                      ▲
141    ///            │                                     │                                      │
142    ///            │                                     │                           ┌────────────────────┐
143    /// ┌────────────────────┐                ┌────────────────────┐                 │        Scan        │
144    /// │        Scan        │                │        Scan        │                 │  filter=(t.b > 5)  │
145    /// └────────────────────┘                │  filter=(t.b > 5)  │                 │  projection=(t.a)  │
146    ///                                       └────────────────────┘                 └────────────────────┘
147    ///
148    /// Initial Plan                  If `TableProviderFilterPushDown`           Projection pushdown notes that
149    ///                               returns true, filter pushdown              the scan only needs t.a
150    ///                               pushes the filter into the scan
151    ///                                                                          BUT internally evaluating the
152    ///                                                                          predicate still requires t.b
153    /// ```
154    ///
155    /// # Limit
156    ///
157    /// If `limit` is specified,  must only produce *at least* this many rows,
158    /// (though it may return more).  Like Projection Pushdown and Filter
159    /// Pushdown, DataFusion pushes `LIMIT`s  as far down in the plan as
160    /// possible, called "Limit Pushdown" as some sources can use this
161    /// information to improve their performance. Note that if there are any
162    /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is
163    /// because inexact filters do not guarantee that every filtered row is
164    /// removed, so applying the limit could lead to too few rows being available
165    /// to return as a final result.
166    async fn scan(
167        &self,
168        state: &dyn Session,
169        projection: Option<&Vec<usize>>,
170        filters: &[Expr],
171        limit: Option<usize>,
172    ) -> Result<Arc<dyn ExecutionPlan>>;
173
174    /// Create an [`ExecutionPlan`] for scanning the table using structured arguments.
175    ///
176    /// This method uses [`ScanArgs`] to pass scan parameters in a structured way
177    /// and returns a [`ScanResult`] containing the execution plan.
178    ///
179    /// Table providers can override this method to take advantage of additional
180    /// parameters like the upcoming `preferred_ordering` that may not be available through
181    /// other scan methods.
182    ///
183    /// # Arguments
184    /// * `state` - The session state containing configuration and context
185    /// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences
186    ///
187    /// # Returns
188    /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table
189    ///
190    /// See [`Self::scan`] for detailed documentation about projection, filters, and limits.
191    async fn scan_with_args<'a>(
192        &self,
193        state: &dyn Session,
194        args: ScanArgs<'a>,
195    ) -> Result<ScanResult> {
196        let filters = args.filters().unwrap_or(&[]);
197        let projection = args.projection().map(|p| p.to_vec());
198        let limit = args.limit();
199        let plan = self
200            .scan(state, projection.as_ref(), filters, limit)
201            .await?;
202        Ok(plan.into())
203    }
204
205    /// Specify if DataFusion should provide filter expressions to the
206    /// TableProvider to apply *during* the scan.
207    ///
208    /// Some TableProviders can evaluate filters more efficiently than the
209    /// `Filter` operator in DataFusion, for example by using an index.
210    ///
211    /// # Parameters and Return Value
212    ///
213    /// The return `Vec` must have one element for each element of the `filters`
214    /// argument. The value of each element indicates if the TableProvider can
215    /// apply the corresponding filter during the scan. The position in the return
216    /// value corresponds to the expression in the `filters` parameter.
217    ///
218    /// If the length of the resulting `Vec` does not match the `filters` input
219    /// an error will be thrown.
220    ///
221    /// Each element in the resulting `Vec` is one of the following:
222    /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter
223    /// during scan
224    /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan
225    ///
226    /// By default, this function returns [`Unsupported`] for all filters,
227    /// meaning no filters will be provided to [`Self::scan`].
228    ///
229    /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
230    /// [`Exact`]: TableProviderFilterPushDown::Exact
231    /// [`Inexact`]: TableProviderFilterPushDown::Inexact
232    /// # Example
233    ///
234    /// ```rust
235    /// # use std::any::Any;
236    /// # use std::sync::Arc;
237    /// # use arrow::datatypes::SchemaRef;
238    /// # use async_trait::async_trait;
239    /// # use datafusion_catalog::{TableProvider, Session};
240    /// # use datafusion_common::Result;
241    /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
242    /// # use datafusion_physical_plan::ExecutionPlan;
243    /// // Define a struct that implements the TableProvider trait
244    /// #[derive(Debug)]
245    /// struct TestDataSource {}
246    ///
247    /// #[async_trait]
248    /// impl TableProvider for TestDataSource {
249    /// # fn as_any(&self) -> &dyn Any { todo!() }
250    /// # fn schema(&self) -> SchemaRef { todo!() }
251    /// # fn table_type(&self) -> TableType { todo!() }
252    /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec<usize>>, f: &[Expr], l: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> {
253    ///         todo!()
254    /// # }
255    ///     // Override the supports_filters_pushdown to evaluate which expressions
256    ///     // to accept as pushdown predicates.
257    ///     fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
258    ///         // Process each filter
259    ///         let support: Vec<_> = filters.iter().map(|expr| {
260    ///           match expr {
261    ///             // This example only supports a between expr with a single column named "c1".
262    ///             Expr::Between(between_expr) => {
263    ///                 between_expr.expr
264    ///                 .try_as_col()
265    ///                 .map(|column| {
266    ///                     if column.name == "c1" {
267    ///                         TableProviderFilterPushDown::Exact
268    ///                     } else {
269    ///                         TableProviderFilterPushDown::Unsupported
270    ///                     }
271    ///                 })
272    ///                 // If there is no column in the expr set the filter to unsupported.
273    ///                 .unwrap_or(TableProviderFilterPushDown::Unsupported)
274    ///             }
275    ///             _ => {
276    ///                 // For all other cases return Unsupported.
277    ///                 TableProviderFilterPushDown::Unsupported
278    ///             }
279    ///         }
280    ///     }).collect();
281    ///     Ok(support)
282    ///     }
283    /// }
284    /// ```
285    fn supports_filters_pushdown(
286        &self,
287        filters: &[&Expr],
288    ) -> Result<Vec<TableProviderFilterPushDown>> {
289        Ok(vec![
290            TableProviderFilterPushDown::Unsupported;
291            filters.len()
292        ])
293    }
294
295    /// Get statistics for this table, if available
296    /// Although not presently used in mainline DataFusion, this allows implementation specific
297    /// behavior for downstream repositories, in conjunction with specialized optimizer rules to
298    /// perform operations such as re-ordering of joins.
299    fn statistics(&self) -> Option<Statistics> {
300        None
301    }
302
303    /// Return an [`ExecutionPlan`] to insert data into this table, if
304    /// supported.
305    ///
306    /// The returned plan should return a single row in a UInt64
307    /// column called "count" such as the following
308    ///
309    /// ```text
310    /// +-------+,
311    /// | count |,
312    /// +-------+,
313    /// | 6     |,
314    /// +-------+,
315    /// ```
316    ///
317    /// # See Also
318    ///
319    /// See [`DataSinkExec`] for the common pattern of inserting a
320    /// streams of `RecordBatch`es as files to an ObjectStore.
321    ///
322    /// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
323    async fn insert_into(
324        &self,
325        _state: &dyn Session,
326        _input: Arc<dyn ExecutionPlan>,
327        _insert_op: InsertOp,
328    ) -> Result<Arc<dyn ExecutionPlan>> {
329        not_impl_err!("Insert into not implemented for this table")
330    }
331
332    /// Delete rows matching the filter predicates.
333    ///
334    /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64).
335    /// Empty `filters` deletes all rows.
336    async fn delete_from(
337        &self,
338        _state: &dyn Session,
339        _filters: Vec<Expr>,
340    ) -> Result<Arc<dyn ExecutionPlan>> {
341        not_impl_err!("DELETE not supported for {} table", self.table_type())
342    }
343
344    /// Update rows matching the filter predicates.
345    ///
346    /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64).
347    /// Empty `filters` updates all rows.
348    async fn update(
349        &self,
350        _state: &dyn Session,
351        _assignments: Vec<(String, Expr)>,
352        _filters: Vec<Expr>,
353    ) -> Result<Arc<dyn ExecutionPlan>> {
354        not_impl_err!("UPDATE not supported for {} table", self.table_type())
355    }
356
357    /// Remove all rows from the table.
358    ///
359    /// Should return an [ExecutionPlan] producing a single row with count (UInt64),
360    /// representing the number of rows removed.
361    async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
362        not_impl_err!("TRUNCATE not supported for {} table", self.table_type())
363    }
364}
365
366/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
367#[derive(Debug, Clone, Default)]
368pub struct ScanArgs<'a> {
369    filters: Option<&'a [Expr]>,
370    projection: Option<&'a [usize]>,
371    limit: Option<usize>,
372}
373
374impl<'a> ScanArgs<'a> {
375    /// Set the column projection for the scan.
376    ///
377    /// The projection is a list of column indices from [`TableProvider::schema`]
378    /// that should be included in the scan results. If `None`, all columns are included.
379    ///
380    /// # Arguments
381    /// * `projection` - Optional slice of column indices to project
382    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
383        self.projection = projection;
384        self
385    }
386
387    /// Get the column projection for the scan.
388    ///
389    /// Returns a reference to the projection column indices, or `None` if
390    /// no projection was specified (meaning all columns should be included).
391    pub fn projection(&self) -> Option<&'a [usize]> {
392        self.projection
393    }
394
395    /// Set the filter expressions for the scan.
396    ///
397    /// Filters are boolean expressions that should be evaluated during the scan
398    /// to reduce the number of rows returned. All expressions are combined with AND logic.
399    /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`].
400    ///
401    /// # Arguments
402    /// * `filters` - Optional slice of filter expressions
403    pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
404        self.filters = filters;
405        self
406    }
407
408    /// Get the filter expressions for the scan.
409    ///
410    /// Returns a reference to the filter expressions, or `None` if no filters were specified.
411    pub fn filters(&self) -> Option<&'a [Expr]> {
412        self.filters
413    }
414
415    /// Set the maximum number of rows to return from the scan.
416    ///
417    /// If specified, the scan should return at most this many rows. This is typically
418    /// used to optimize queries with `LIMIT` clauses.
419    ///
420    /// # Arguments
421    /// * `limit` - Optional maximum number of rows to return
422    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
423        self.limit = limit;
424        self
425    }
426
427    /// Get the maximum number of rows to return from the scan.
428    ///
429    /// Returns the row limit, or `None` if no limit was specified.
430    pub fn limit(&self) -> Option<usize> {
431        self.limit
432    }
433}
434
435/// Result of a table scan operation from [`TableProvider::scan_with_args`].
436#[derive(Debug, Clone)]
437pub struct ScanResult {
438    /// The ExecutionPlan to run.
439    plan: Arc<dyn ExecutionPlan>,
440}
441
442impl ScanResult {
443    /// Create a new `ScanResult` with the given execution plan.
444    ///
445    /// # Arguments
446    /// * `plan` - The execution plan that will perform the table scan
447    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
448        Self { plan }
449    }
450
451    /// Get a reference to the execution plan for this scan result.
452    ///
453    /// Returns a reference to the [`ExecutionPlan`] that will perform
454    /// the actual table scanning and data retrieval.
455    pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
456        &self.plan
457    }
458
459    /// Consume this ScanResult and return the execution plan.
460    ///
461    /// Returns the owned [`ExecutionPlan`] that will perform
462    /// the actual table scanning and data retrieval.
463    pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
464        self.plan
465    }
466}
467
468impl From<Arc<dyn ExecutionPlan>> for ScanResult {
469    fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
470        Self::new(plan)
471    }
472}
473
474/// A factory which creates [`TableProvider`]s at runtime given a URL.
475///
476/// For example, this can be used to create a table "on the fly"
477/// from a directory of files only when that name is referenced.
478#[async_trait]
479pub trait TableProviderFactory: Debug + Sync + Send {
480    /// Create a TableProvider with the given url
481    async fn create(
482        &self,
483        state: &dyn Session,
484        cmd: &CreateExternalTable,
485    ) -> Result<Arc<dyn TableProvider>>;
486}
487
488/// A trait for table function implementations
489pub trait TableFunctionImpl: Debug + Sync + Send {
490    /// Create a table provider
491    fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
492}
493
494/// A table that uses a function to generate data
495#[derive(Clone, Debug)]
496pub struct TableFunction {
497    /// Name of the table function
498    name: String,
499    /// Function implementation
500    fun: Arc<dyn TableFunctionImpl>,
501}
502
503impl TableFunction {
504    /// Create a new table function
505    pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
506        Self { name, fun }
507    }
508
509    /// Get the name of the table function
510    pub fn name(&self) -> &str {
511        &self.name
512    }
513
514    /// Get the implementation of the table function
515    pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
516        &self.fun
517    }
518
519    /// Get the function implementation and generate a table
520    pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
521        self.fun.call(args)
522    }
523}