Skip to main content

datafusion_catalog/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::session::Session;
24use arrow::datatypes::SchemaRef;
25use async_trait::async_trait;
26use datafusion_common::{Constraints, Statistics, not_impl_err};
27use datafusion_common::{Result, internal_err};
28use datafusion_expr::Expr;
29
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{
32    CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
33};
34use datafusion_physical_plan::ExecutionPlan;
35
36/// A table which can be queried and modified.
37///
38/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
39///
40/// [`TableProvider`] represents a source of data which can provide data as
41/// Apache Arrow [`RecordBatch`]es. Implementations of this trait provide
42/// important information for planning such as:
43///
44/// 1. [`Self::schema`]: The schema (columns and their types) of the table
45/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
46/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
47///
48/// [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
49/// [`CatalogProvider`]: super::CatalogProvider
50#[async_trait]
51pub trait TableProvider: Any + Debug + Sync + Send {
52    /// Get a reference to the schema for this table
53    fn schema(&self) -> SchemaRef;
54
55    /// Get a reference to the constraints of the table.
56    /// Returns:
57    /// - `None` for tables that do not support constraints.
58    /// - `Some(&Constraints)` for tables supporting constraints.
59    /// Therefore, a `Some(&Constraints::empty())` return value indicates that
60    /// this table supports constraints, but there are no constraints.
61    fn constraints(&self) -> Option<&Constraints> {
62        None
63    }
64
65    /// Get the type of this table for metadata/catalog purposes.
66    fn table_type(&self) -> TableType;
67
68    /// Get the create statement used to create this table, if available.
69    fn get_table_definition(&self) -> Option<&str> {
70        None
71    }
72
73    /// Get the [`LogicalPlan`] of this table, if available.
74    fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
75        None
76    }
77
78    /// Get the default value for a column, if available.
79    fn get_column_default(&self, _column: &str) -> Option<&Expr> {
80        None
81    }
82
83    /// Create an [`ExecutionPlan`] for scanning the table with optional
84    /// `projection`, `filter`, and `limit`, described below.
85    ///
86    /// The returned `ExecutionPlan` is responsible for scanning the datasource's
87    /// partitions in a streaming, parallelized fashion.
88    ///
89    /// # Projection
90    ///
91    /// If specified, only a subset of columns should be returned, in the order
92    /// specified. The projection is a set of indexes of the fields in
93    /// [`Self::schema`].
94    ///
95    /// DataFusion provides the projection so the scan reads only the columns
96    /// actually used in the query, an optimization called "Projection
97    /// Pushdown". Some datasources, such as Parquet, can use this information
98    /// to go significantly faster when only a subset of columns is required.
99    ///
100    /// # Filters
101    ///
102    /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the
103    /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for
104    /// which *all* of the `Expr`s evaluate to `true` must be returned (that is,
105    /// the expressions are `AND`ed together).
106    ///
107    /// To enable filter pushdown, override
108    /// [`Self::supports_filters_pushdown`]. The default implementation does not
109    /// push down filters, and `filters` will be empty.
110    ///
111    /// DataFusion pushes filters into scans whenever possible ("Filter
112    /// Pushdown"). Depending on the data format and implementation, evaluating
113    /// predicates during the scan can significantly improve performance.
114    ///
115    /// ## Note: Some columns may appear *only* in Filters
116    ///
117    /// In some cases, a query may use a column only in a filter and the
118    /// projection will not contain all columns referenced by the filter
119    /// expressions.
120    ///
121    /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`,
122    ///
123    /// ```text
124    /// ┌────────────────────┐
125    /// │  Projection(t.a)   │
126    /// └────────────────────┘
127    ///            ▲
128    ///            │
129    ///            │
130    /// ┌────────────────────┐     Filter     ┌────────────────────┐   Projection    ┌────────────────────┐
131    /// │  Filter(t.b > 5)   │────Pushdown──▶ │  Projection(t.a)   │ ───Pushdown───▶ │  Projection(t.a)   │
132    /// └────────────────────┘                └────────────────────┘                 └────────────────────┘
133    ///            ▲                                     ▲                                      ▲
134    ///            │                                     │                                      │
135    ///            │                                     │                           ┌────────────────────┐
136    /// ┌────────────────────┐                ┌────────────────────┐                 │        Scan        │
137    /// │        Scan        │                │        Scan        │                 │  filter=(t.b > 5)  │
138    /// └────────────────────┘                │  filter=(t.b > 5)  │                 │  projection=(t.a)  │
139    ///                                       └────────────────────┘                 └────────────────────┘
140    ///
141    /// Initial Plan                  If `TableProviderFilterPushDown`           Projection pushdown notes that
142    ///                               returns true, filter pushdown              the scan only needs t.a
143    ///                               pushes the filter into the scan
144    ///                                                                          BUT internally evaluating the
145    ///                                                                          predicate still requires t.b
146    /// ```
147    ///
148    /// # Limit
149    ///
150    /// If `limit` is specified, the scan must produce *at least* this many
151    /// rows, though it may return more. Like Projection Pushdown and Filter
152    /// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as
153    /// possible. This is called "Limit Pushdown", and some sources can use the
154    /// information to improve performance.
155    ///
156    /// Note: If any pushed-down filters are `Inexact`, the `LIMIT` cannot be
157    /// pushed down. Inexact filters do not guarantee that every filtered row is
158    /// removed, so applying the limit could leave too few rows to return in the
159    /// final result.
160    ///
161    /// # Evaluation Order
162    ///
163    /// The logical evaluation order is `filters`, then `limit`, then
164    /// `projection`.
165    ///
166    /// Note that `limit` applies to the filtered result, not to the unfiltered
167    /// input, and `projection` affects only which columns are returned, not
168    /// which rows qualify.
169    ///
170    /// For example, if a scan receives:
171    ///
172    /// - `projection = [a]`
173    /// - `filters = [b > 5]`
174    /// - `limit = Some(3)`
175    ///
176    /// It must logically produce results equivalent to:
177    ///
178    /// ```text
179    /// PROJECTION a (LIMIT 3 (SCAN WHERE b > 5))
180    /// ```
181    ///
182    /// As noted above, columns referenced only by pushed-down filters may be
183    /// absent from `projection`.
184    async fn scan(
185        &self,
186        state: &dyn Session,
187        projection: Option<&Vec<usize>>,
188        filters: &[Expr],
189        limit: Option<usize>,
190    ) -> Result<Arc<dyn ExecutionPlan>>;
191
192    /// Create an [`ExecutionPlan`] for scanning the table using structured arguments.
193    ///
194    /// This method uses [`ScanArgs`] to pass scan parameters in a structured way
195    /// and returns a [`ScanResult`] containing the execution plan.
196    ///
197    /// Table providers can override this method to take advantage of additional
198    /// parameters like the upcoming `preferred_ordering` that may not be available through
199    /// other scan methods.
200    ///
201    /// # Arguments
202    /// * `state` - The session state containing configuration and context
203    /// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences
204    ///
205    /// # Returns
206    /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table
207    ///
208    /// See [`Self::scan`] for detailed documentation about projection, filters, and limits.
209    async fn scan_with_args<'a>(
210        &self,
211        state: &dyn Session,
212        args: ScanArgs<'a>,
213    ) -> Result<ScanResult> {
214        let filters = args.filters().unwrap_or(&[]);
215        let projection = args.projection().map(|p| p.to_vec());
216        let limit = args.limit();
217        let plan = self
218            .scan(state, projection.as_ref(), filters, limit)
219            .await?;
220        Ok(plan.into())
221    }
222
223    /// Specify if DataFusion should provide filter expressions to the
224    /// TableProvider to apply *during* the scan.
225    ///
226    /// Some TableProviders can evaluate filters more efficiently than the
227    /// `Filter` operator in DataFusion, for example by using an index.
228    ///
229    /// # Parameters and Return Value
230    ///
231    /// The return `Vec` must have one element for each element of the `filters`
232    /// argument. The value of each element indicates if the TableProvider can
233    /// apply the corresponding filter during the scan. The position in the return
234    /// value corresponds to the expression in the `filters` parameter.
235    ///
236    /// If the length of the resulting `Vec` does not match the `filters` input
237    /// an error will be thrown.
238    ///
239    /// Each element in the resulting `Vec` is one of the following:
240    /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter
241    /// during scan
242    /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan
243    ///
244    /// By default, this function returns [`Unsupported`] for all filters,
245    /// meaning no filters will be provided to [`Self::scan`].
246    ///
247    /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
248    /// [`Exact`]: TableProviderFilterPushDown::Exact
249    /// [`Inexact`]: TableProviderFilterPushDown::Inexact
250    /// # Example
251    ///
252    /// ```rust
253    /// # use std::any::Any;
254    /// # use std::sync::Arc;
255    /// # use arrow::datatypes::SchemaRef;
256    /// # use async_trait::async_trait;
257    /// # use datafusion_catalog::{TableProvider, Session};
258    /// # use datafusion_common::Result;
259    /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
260    /// # use datafusion_physical_plan::ExecutionPlan;
261    /// // Define a struct that implements the TableProvider trait
262    /// #[derive(Debug)]
263    /// struct TestDataSource {}
264    ///
265    /// #[async_trait]
266    /// impl TableProvider for TestDataSource {
267    /// # fn schema(&self) -> SchemaRef { todo!() }
268    /// # fn table_type(&self) -> TableType { todo!() }
269    /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec<usize>>, f: &[Expr], l: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> {
270    ///         todo!()
271    /// # }
272    ///     // Override the supports_filters_pushdown to evaluate which expressions
273    ///     // to accept as pushdown predicates.
274    ///     fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
275    ///         // Process each filter
276    ///         let support: Vec<_> = filters.iter().map(|expr| {
277    ///           match expr {
278    ///             // This example only supports a between expr with a single column named "c1".
279    ///             Expr::Between(between_expr) => {
280    ///                 between_expr.expr
281    ///                 .try_as_col()
282    ///                 .map(|column| {
283    ///                     if column.name == "c1" {
284    ///                         TableProviderFilterPushDown::Exact
285    ///                     } else {
286    ///                         TableProviderFilterPushDown::Unsupported
287    ///                     }
288    ///                 })
289    ///                 // If there is no column in the expr set the filter to unsupported.
290    ///                 .unwrap_or(TableProviderFilterPushDown::Unsupported)
291    ///             }
292    ///             _ => {
293    ///                 // For all other cases return Unsupported.
294    ///                 TableProviderFilterPushDown::Unsupported
295    ///             }
296    ///         }
297    ///     }).collect();
298    ///     Ok(support)
299    ///     }
300    /// }
301    /// ```
302    fn supports_filters_pushdown(
303        &self,
304        filters: &[&Expr],
305    ) -> Result<Vec<TableProviderFilterPushDown>> {
306        Ok(vec![
307            TableProviderFilterPushDown::Unsupported;
308            filters.len()
309        ])
310    }
311
312    /// Get statistics for this table, if available
313    /// Although not presently used in mainline DataFusion, this allows implementation specific
314    /// behavior for downstream repositories, in conjunction with specialized optimizer rules to
315    /// perform operations such as re-ordering of joins.
316    fn statistics(&self) -> Option<Statistics> {
317        None
318    }
319
320    /// Return an [`ExecutionPlan`] to insert data into this table, if
321    /// supported.
322    ///
323    /// The returned plan should return a single row in a UInt64
324    /// column called "count" such as the following
325    ///
326    /// ```text
327    /// +-------+,
328    /// | count |,
329    /// +-------+,
330    /// | 6     |,
331    /// +-------+,
332    /// ```
333    ///
334    /// # See Also
335    ///
336    /// See [`DataSinkExec`] for the common pattern of inserting a
337    /// streams of `RecordBatch`es as files to an ObjectStore.
338    ///
339    /// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
340    async fn insert_into(
341        &self,
342        _state: &dyn Session,
343        _input: Arc<dyn ExecutionPlan>,
344        _insert_op: InsertOp,
345    ) -> Result<Arc<dyn ExecutionPlan>> {
346        not_impl_err!("Insert into not implemented for this table")
347    }
348
349    /// Delete rows matching the filter predicates.
350    ///
351    /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64).
352    /// Empty `filters` deletes all rows.
353    async fn delete_from(
354        &self,
355        _state: &dyn Session,
356        _filters: Vec<Expr>,
357    ) -> Result<Arc<dyn ExecutionPlan>> {
358        not_impl_err!("DELETE not supported for {} table", self.table_type())
359    }
360
361    /// Update rows matching the filter predicates.
362    ///
363    /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64).
364    /// Empty `filters` updates all rows.
365    async fn update(
366        &self,
367        _state: &dyn Session,
368        _assignments: Vec<(String, Expr)>,
369        _filters: Vec<Expr>,
370    ) -> Result<Arc<dyn ExecutionPlan>> {
371        not_impl_err!("UPDATE not supported for {} table", self.table_type())
372    }
373
374    /// Remove all rows from the table.
375    ///
376    /// Should return an [ExecutionPlan] producing a single row with count (UInt64),
377    /// representing the number of rows removed.
378    async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
379        not_impl_err!("TRUNCATE not supported for {} table", self.table_type())
380    }
381}
382
383impl dyn TableProvider {
384    /// Returns `true` if the table provider is of type `T`.
385    ///
386    /// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
387    /// called on `Arc<dyn TableProvider>` via auto-deref.
388    pub fn is<T: TableProvider>(&self) -> bool {
389        (self as &dyn Any).is::<T>()
390    }
391
392    /// Attempts to downcast this table provider to a concrete type `T`,
393    /// returning `None` if the provider is not of that type.
394    ///
395    /// Works correctly when called on `Arc<dyn TableProvider>` via auto-deref,
396    /// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
397    /// downcast the `Arc` itself.
398    pub fn downcast_ref<T: TableProvider>(&self) -> Option<&T> {
399        (self as &dyn Any).downcast_ref()
400    }
401}
402
403/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
404#[derive(Debug, Clone, Default)]
405pub struct ScanArgs<'a> {
406    filters: Option<&'a [Expr]>,
407    projection: Option<&'a [usize]>,
408    limit: Option<usize>,
409}
410
411impl<'a> ScanArgs<'a> {
412    /// Set the column projection for the scan.
413    ///
414    /// The projection is a list of column indices from [`TableProvider::schema`]
415    /// that should be included in the scan results. If `None`, all columns are included.
416    ///
417    /// # Arguments
418    /// * `projection` - Optional slice of column indices to project
419    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
420        self.projection = projection;
421        self
422    }
423
424    /// Get the column projection for the scan.
425    ///
426    /// Returns a reference to the projection column indices, or `None` if
427    /// no projection was specified (meaning all columns should be included).
428    pub fn projection(&self) -> Option<&'a [usize]> {
429        self.projection
430    }
431
432    /// Set the filter expressions for the scan.
433    ///
434    /// Filters are boolean expressions that should be evaluated during the scan
435    /// to reduce the number of rows returned. All expressions are combined with AND logic.
436    /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`].
437    ///
438    /// # Arguments
439    /// * `filters` - Optional slice of filter expressions
440    pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
441        self.filters = filters;
442        self
443    }
444
445    /// Get the filter expressions for the scan.
446    ///
447    /// Returns a reference to the filter expressions, or `None` if no filters were specified.
448    pub fn filters(&self) -> Option<&'a [Expr]> {
449        self.filters
450    }
451
452    /// Set the maximum number of rows to return from the scan.
453    ///
454    /// If specified, the scan should return at most this many rows. This is typically
455    /// used to optimize queries with `LIMIT` clauses.
456    ///
457    /// # Arguments
458    /// * `limit` - Optional maximum number of rows to return
459    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
460        self.limit = limit;
461        self
462    }
463
464    /// Get the maximum number of rows to return from the scan.
465    ///
466    /// Returns the row limit, or `None` if no limit was specified.
467    pub fn limit(&self) -> Option<usize> {
468        self.limit
469    }
470}
471
472/// Result of a table scan operation from [`TableProvider::scan_with_args`].
473#[derive(Debug, Clone)]
474pub struct ScanResult {
475    /// The ExecutionPlan to run.
476    plan: Arc<dyn ExecutionPlan>,
477}
478
479impl ScanResult {
480    /// Create a new `ScanResult` with the given execution plan.
481    ///
482    /// # Arguments
483    /// * `plan` - The execution plan that will perform the table scan
484    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
485        Self { plan }
486    }
487
488    /// Get a reference to the execution plan for this scan result.
489    ///
490    /// Returns a reference to the [`ExecutionPlan`] that will perform
491    /// the actual table scanning and data retrieval.
492    pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
493        &self.plan
494    }
495
496    /// Consume this ScanResult and return the execution plan.
497    ///
498    /// Returns the owned [`ExecutionPlan`] that will perform
499    /// the actual table scanning and data retrieval.
500    pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
501        self.plan
502    }
503}
504
505impl From<Arc<dyn ExecutionPlan>> for ScanResult {
506    fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
507        Self::new(plan)
508    }
509}
510
511/// A factory which creates [`TableProvider`]s at runtime given a URL.
512///
513/// For example, this can be used to create a table "on the fly"
514/// from a directory of files only when that name is referenced.
515#[async_trait]
516pub trait TableProviderFactory: Debug + Sync + Send {
517    /// Create a TableProvider with the given url
518    async fn create(
519        &self,
520        state: &dyn Session,
521        cmd: &CreateExternalTable,
522    ) -> Result<Arc<dyn TableProvider>>;
523}
524
525/// Describes arguments provided to the table function call.
526pub struct TableFunctionArgs<'e, 's> {
527    /// Call arguments.
528    exprs: &'e [Expr],
529    /// Session within which the function is called.
530    session: &'s dyn Session,
531}
532
533impl<'e, 's> TableFunctionArgs<'e, 's> {
534    /// Make a new [`TableFunctionArgs`].
535    pub fn new(exprs: &'e [Expr], session: &'s dyn Session) -> Self {
536        Self { exprs, session }
537    }
538
539    /// Get expressions passed as the called function arguments.
540    pub fn exprs(&self) -> &'e [Expr] {
541        self.exprs
542    }
543
544    /// Get a session where the table function is called.
545    pub fn session(&self) -> &'s dyn Session {
546        self.session
547    }
548}
549
550/// A trait for table function implementations
551pub trait TableFunctionImpl: Debug + Sync + Send + Any {
552    /// Create a table provider
553    #[deprecated(
554        since = "53.0.0",
555        note = "Implement `TableFunctionImpl::call_with_args` instead"
556    )]
557    fn call(&self, _exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
558        internal_err!(
559            "TableFunctionImpl::call is not implemented. Implement TableFunctionImpl::call_with_args instead."
560        )
561    }
562
563    /// Create a table provider
564    fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
565        #[expect(deprecated)]
566        self.call(args.exprs)
567    }
568}
569
570/// A table that uses a function to generate data
571#[derive(Clone, Debug)]
572pub struct TableFunction {
573    /// Name of the table function
574    name: String,
575    /// Function implementation
576    fun: Arc<dyn TableFunctionImpl>,
577}
578
579impl TableFunction {
580    /// Create a new table function
581    pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
582        Self { name, fun }
583    }
584
585    /// Get the name of the table function
586    pub fn name(&self) -> &str {
587        &self.name
588    }
589
590    /// Get the implementation of the table function
591    pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
592        &self.fun
593    }
594
595    /// Get the function implementation and generate a table
596    #[deprecated(
597        since = "53.0.0",
598        note = "Use `TableFunction::create_table_provider_with_args` instead"
599    )]
600    pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
601        #[expect(deprecated)]
602        self.fun.call(args)
603    }
604
605    /// Get the function implementation and generate a table
606    pub fn create_table_provider_with_args(
607        &self,
608        args: TableFunctionArgs,
609    ) -> Result<Arc<dyn TableProvider>> {
610        self.fun.call_with_args(args)
611    }
612}