datafusion_catalog/table.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::session::Session;
24use arrow::datatypes::SchemaRef;
25use async_trait::async_trait;
26use datafusion_common::{Constraints, Statistics, not_impl_err};
27use datafusion_common::{Result, internal_err};
28use datafusion_expr::Expr;
29
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{
32 CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
33};
34use datafusion_physical_plan::ExecutionPlan;
35
36/// A table which can be queried and modified.
37///
38/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
39///
40/// [`TableProvider`] represents a source of data which can provide data as
41/// Apache Arrow [`RecordBatch`]es. Implementations of this trait provide
42/// important information for planning such as:
43///
44/// 1. [`Self::schema`]: The schema (columns and their types) of the table
45/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
46/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
47///
48/// [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
49/// [`CatalogProvider`]: super::CatalogProvider
50#[async_trait]
51pub trait TableProvider: Any + Debug + Sync + Send {
52 /// Get a reference to the schema for this table
53 fn schema(&self) -> SchemaRef;
54
55 /// Get a reference to the constraints of the table.
56 /// Returns:
57 /// - `None` for tables that do not support constraints.
58 /// - `Some(&Constraints)` for tables supporting constraints.
59 /// Therefore, a `Some(&Constraints::empty())` return value indicates that
60 /// this table supports constraints, but there are no constraints.
61 fn constraints(&self) -> Option<&Constraints> {
62 None
63 }
64
65 /// Get the type of this table for metadata/catalog purposes.
66 fn table_type(&self) -> TableType;
67
68 /// Get the create statement used to create this table, if available.
69 fn get_table_definition(&self) -> Option<&str> {
70 None
71 }
72
73 /// Get the [`LogicalPlan`] of this table, if available.
74 fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
75 None
76 }
77
78 /// Get the default value for a column, if available.
79 fn get_column_default(&self, _column: &str) -> Option<&Expr> {
80 None
81 }
82
83 /// Create an [`ExecutionPlan`] for scanning the table with optional
84 /// `projection`, `filter`, and `limit`, described below.
85 ///
86 /// The returned `ExecutionPlan` is responsible for scanning the datasource's
87 /// partitions in a streaming, parallelized fashion.
88 ///
89 /// # Projection
90 ///
91 /// If specified, only a subset of columns should be returned, in the order
92 /// specified. The projection is a set of indexes of the fields in
93 /// [`Self::schema`].
94 ///
95 /// DataFusion provides the projection so the scan reads only the columns
96 /// actually used in the query, an optimization called "Projection
97 /// Pushdown". Some datasources, such as Parquet, can use this information
98 /// to go significantly faster when only a subset of columns is required.
99 ///
100 /// # Filters
101 ///
102 /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the
103 /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for
104 /// which *all* of the `Expr`s evaluate to `true` must be returned (that is,
105 /// the expressions are `AND`ed together).
106 ///
107 /// To enable filter pushdown, override
108 /// [`Self::supports_filters_pushdown`]. The default implementation does not
109 /// push down filters, and `filters` will be empty.
110 ///
111 /// DataFusion pushes filters into scans whenever possible ("Filter
112 /// Pushdown"). Depending on the data format and implementation, evaluating
113 /// predicates during the scan can significantly improve performance.
114 ///
115 /// ## Note: Some columns may appear *only* in Filters
116 ///
117 /// In some cases, a query may use a column only in a filter and the
118 /// projection will not contain all columns referenced by the filter
119 /// expressions.
120 ///
121 /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`,
122 ///
123 /// ```text
124 /// ┌────────────────────┐
125 /// │ Projection(t.a) │
126 /// └────────────────────┘
127 /// ▲
128 /// │
129 /// │
130 /// ┌────────────────────┐ Filter ┌────────────────────┐ Projection ┌────────────────────┐
131 /// │ Filter(t.b > 5) │────Pushdown──▶ │ Projection(t.a) │ ───Pushdown───▶ │ Projection(t.a) │
132 /// └────────────────────┘ └────────────────────┘ └────────────────────┘
133 /// ▲ ▲ ▲
134 /// │ │ │
135 /// │ │ ┌────────────────────┐
136 /// ┌────────────────────┐ ┌────────────────────┐ │ Scan │
137 /// │ Scan │ │ Scan │ │ filter=(t.b > 5) │
138 /// └────────────────────┘ │ filter=(t.b > 5) │ │ projection=(t.a) │
139 /// └────────────────────┘ └────────────────────┘
140 ///
141 /// Initial Plan If `TableProviderFilterPushDown` Projection pushdown notes that
142 /// returns true, filter pushdown the scan only needs t.a
143 /// pushes the filter into the scan
144 /// BUT internally evaluating the
145 /// predicate still requires t.b
146 /// ```
147 ///
148 /// # Limit
149 ///
150 /// If `limit` is specified, the scan must produce *at least* this many
151 /// rows, though it may return more. Like Projection Pushdown and Filter
152 /// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as
153 /// possible. This is called "Limit Pushdown", and some sources can use the
154 /// information to improve performance.
155 ///
156 /// Note: If any pushed-down filters are `Inexact`, the `LIMIT` cannot be
157 /// pushed down. Inexact filters do not guarantee that every filtered row is
158 /// removed, so applying the limit could leave too few rows to return in the
159 /// final result.
160 ///
161 /// # Evaluation Order
162 ///
163 /// The logical evaluation order is `filters`, then `limit`, then
164 /// `projection`.
165 ///
166 /// Note that `limit` applies to the filtered result, not to the unfiltered
167 /// input, and `projection` affects only which columns are returned, not
168 /// which rows qualify.
169 ///
170 /// For example, if a scan receives:
171 ///
172 /// - `projection = [a]`
173 /// - `filters = [b > 5]`
174 /// - `limit = Some(3)`
175 ///
176 /// It must logically produce results equivalent to:
177 ///
178 /// ```text
179 /// PROJECTION a (LIMIT 3 (SCAN WHERE b > 5))
180 /// ```
181 ///
182 /// As noted above, columns referenced only by pushed-down filters may be
183 /// absent from `projection`.
184 async fn scan(
185 &self,
186 state: &dyn Session,
187 projection: Option<&Vec<usize>>,
188 filters: &[Expr],
189 limit: Option<usize>,
190 ) -> Result<Arc<dyn ExecutionPlan>>;
191
192 /// Create an [`ExecutionPlan`] for scanning the table using structured arguments.
193 ///
194 /// This method uses [`ScanArgs`] to pass scan parameters in a structured way
195 /// and returns a [`ScanResult`] containing the execution plan.
196 ///
197 /// Table providers can override this method to take advantage of additional
198 /// parameters like the upcoming `preferred_ordering` that may not be available through
199 /// other scan methods.
200 ///
201 /// # Arguments
202 /// * `state` - The session state containing configuration and context
203 /// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences
204 ///
205 /// # Returns
206 /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table
207 ///
208 /// See [`Self::scan`] for detailed documentation about projection, filters, and limits.
209 async fn scan_with_args<'a>(
210 &self,
211 state: &dyn Session,
212 args: ScanArgs<'a>,
213 ) -> Result<ScanResult> {
214 let filters = args.filters().unwrap_or(&[]);
215 let projection = args.projection().map(|p| p.to_vec());
216 let limit = args.limit();
217 let plan = self
218 .scan(state, projection.as_ref(), filters, limit)
219 .await?;
220 Ok(plan.into())
221 }
222
223 /// Specify if DataFusion should provide filter expressions to the
224 /// TableProvider to apply *during* the scan.
225 ///
226 /// Some TableProviders can evaluate filters more efficiently than the
227 /// `Filter` operator in DataFusion, for example by using an index.
228 ///
229 /// # Parameters and Return Value
230 ///
231 /// The return `Vec` must have one element for each element of the `filters`
232 /// argument. The value of each element indicates if the TableProvider can
233 /// apply the corresponding filter during the scan. The position in the return
234 /// value corresponds to the expression in the `filters` parameter.
235 ///
236 /// If the length of the resulting `Vec` does not match the `filters` input
237 /// an error will be thrown.
238 ///
239 /// Each element in the resulting `Vec` is one of the following:
240 /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter
241 /// during scan
242 /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan
243 ///
244 /// By default, this function returns [`Unsupported`] for all filters,
245 /// meaning no filters will be provided to [`Self::scan`].
246 ///
247 /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
248 /// [`Exact`]: TableProviderFilterPushDown::Exact
249 /// [`Inexact`]: TableProviderFilterPushDown::Inexact
250 /// # Example
251 ///
252 /// ```rust
253 /// # use std::any::Any;
254 /// # use std::sync::Arc;
255 /// # use arrow::datatypes::SchemaRef;
256 /// # use async_trait::async_trait;
257 /// # use datafusion_catalog::{TableProvider, Session};
258 /// # use datafusion_common::Result;
259 /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
260 /// # use datafusion_physical_plan::ExecutionPlan;
261 /// // Define a struct that implements the TableProvider trait
262 /// #[derive(Debug)]
263 /// struct TestDataSource {}
264 ///
265 /// #[async_trait]
266 /// impl TableProvider for TestDataSource {
267 /// # fn schema(&self) -> SchemaRef { todo!() }
268 /// # fn table_type(&self) -> TableType { todo!() }
269 /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec<usize>>, f: &[Expr], l: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> {
270 /// todo!()
271 /// # }
272 /// // Override the supports_filters_pushdown to evaluate which expressions
273 /// // to accept as pushdown predicates.
274 /// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
275 /// // Process each filter
276 /// let support: Vec<_> = filters.iter().map(|expr| {
277 /// match expr {
278 /// // This example only supports a between expr with a single column named "c1".
279 /// Expr::Between(between_expr) => {
280 /// between_expr.expr
281 /// .try_as_col()
282 /// .map(|column| {
283 /// if column.name == "c1" {
284 /// TableProviderFilterPushDown::Exact
285 /// } else {
286 /// TableProviderFilterPushDown::Unsupported
287 /// }
288 /// })
289 /// // If there is no column in the expr set the filter to unsupported.
290 /// .unwrap_or(TableProviderFilterPushDown::Unsupported)
291 /// }
292 /// _ => {
293 /// // For all other cases return Unsupported.
294 /// TableProviderFilterPushDown::Unsupported
295 /// }
296 /// }
297 /// }).collect();
298 /// Ok(support)
299 /// }
300 /// }
301 /// ```
302 fn supports_filters_pushdown(
303 &self,
304 filters: &[&Expr],
305 ) -> Result<Vec<TableProviderFilterPushDown>> {
306 Ok(vec![
307 TableProviderFilterPushDown::Unsupported;
308 filters.len()
309 ])
310 }
311
312 /// Get statistics for this table, if available
313 /// Although not presently used in mainline DataFusion, this allows implementation specific
314 /// behavior for downstream repositories, in conjunction with specialized optimizer rules to
315 /// perform operations such as re-ordering of joins.
316 fn statistics(&self) -> Option<Statistics> {
317 None
318 }
319
320 /// Return an [`ExecutionPlan`] to insert data into this table, if
321 /// supported.
322 ///
323 /// The returned plan should return a single row in a UInt64
324 /// column called "count" such as the following
325 ///
326 /// ```text
327 /// +-------+,
328 /// | count |,
329 /// +-------+,
330 /// | 6 |,
331 /// +-------+,
332 /// ```
333 ///
334 /// # See Also
335 ///
336 /// See [`DataSinkExec`] for the common pattern of inserting a
337 /// streams of `RecordBatch`es as files to an ObjectStore.
338 ///
339 /// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
340 async fn insert_into(
341 &self,
342 _state: &dyn Session,
343 _input: Arc<dyn ExecutionPlan>,
344 _insert_op: InsertOp,
345 ) -> Result<Arc<dyn ExecutionPlan>> {
346 not_impl_err!("Insert into not implemented for this table")
347 }
348
349 /// Delete rows matching the filter predicates.
350 ///
351 /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64).
352 /// Empty `filters` deletes all rows.
353 async fn delete_from(
354 &self,
355 _state: &dyn Session,
356 _filters: Vec<Expr>,
357 ) -> Result<Arc<dyn ExecutionPlan>> {
358 not_impl_err!("DELETE not supported for {} table", self.table_type())
359 }
360
361 /// Update rows matching the filter predicates.
362 ///
363 /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64).
364 /// Empty `filters` updates all rows.
365 async fn update(
366 &self,
367 _state: &dyn Session,
368 _assignments: Vec<(String, Expr)>,
369 _filters: Vec<Expr>,
370 ) -> Result<Arc<dyn ExecutionPlan>> {
371 not_impl_err!("UPDATE not supported for {} table", self.table_type())
372 }
373
374 /// Remove all rows from the table.
375 ///
376 /// Should return an [ExecutionPlan] producing a single row with count (UInt64),
377 /// representing the number of rows removed.
378 async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
379 not_impl_err!("TRUNCATE not supported for {} table", self.table_type())
380 }
381}
382
383impl dyn TableProvider {
384 /// Returns `true` if the table provider is of type `T`.
385 ///
386 /// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
387 /// called on `Arc<dyn TableProvider>` via auto-deref.
388 pub fn is<T: TableProvider>(&self) -> bool {
389 (self as &dyn Any).is::<T>()
390 }
391
392 /// Attempts to downcast this table provider to a concrete type `T`,
393 /// returning `None` if the provider is not of that type.
394 ///
395 /// Works correctly when called on `Arc<dyn TableProvider>` via auto-deref,
396 /// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
397 /// downcast the `Arc` itself.
398 pub fn downcast_ref<T: TableProvider>(&self) -> Option<&T> {
399 (self as &dyn Any).downcast_ref()
400 }
401}
402
403/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
404#[derive(Debug, Clone, Default)]
405pub struct ScanArgs<'a> {
406 filters: Option<&'a [Expr]>,
407 projection: Option<&'a [usize]>,
408 limit: Option<usize>,
409}
410
411impl<'a> ScanArgs<'a> {
412 /// Set the column projection for the scan.
413 ///
414 /// The projection is a list of column indices from [`TableProvider::schema`]
415 /// that should be included in the scan results. If `None`, all columns are included.
416 ///
417 /// # Arguments
418 /// * `projection` - Optional slice of column indices to project
419 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
420 self.projection = projection;
421 self
422 }
423
424 /// Get the column projection for the scan.
425 ///
426 /// Returns a reference to the projection column indices, or `None` if
427 /// no projection was specified (meaning all columns should be included).
428 pub fn projection(&self) -> Option<&'a [usize]> {
429 self.projection
430 }
431
432 /// Set the filter expressions for the scan.
433 ///
434 /// Filters are boolean expressions that should be evaluated during the scan
435 /// to reduce the number of rows returned. All expressions are combined with AND logic.
436 /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`].
437 ///
438 /// # Arguments
439 /// * `filters` - Optional slice of filter expressions
440 pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
441 self.filters = filters;
442 self
443 }
444
445 /// Get the filter expressions for the scan.
446 ///
447 /// Returns a reference to the filter expressions, or `None` if no filters were specified.
448 pub fn filters(&self) -> Option<&'a [Expr]> {
449 self.filters
450 }
451
452 /// Set the maximum number of rows to return from the scan.
453 ///
454 /// If specified, the scan should return at most this many rows. This is typically
455 /// used to optimize queries with `LIMIT` clauses.
456 ///
457 /// # Arguments
458 /// * `limit` - Optional maximum number of rows to return
459 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
460 self.limit = limit;
461 self
462 }
463
464 /// Get the maximum number of rows to return from the scan.
465 ///
466 /// Returns the row limit, or `None` if no limit was specified.
467 pub fn limit(&self) -> Option<usize> {
468 self.limit
469 }
470}
471
472/// Result of a table scan operation from [`TableProvider::scan_with_args`].
473#[derive(Debug, Clone)]
474pub struct ScanResult {
475 /// The ExecutionPlan to run.
476 plan: Arc<dyn ExecutionPlan>,
477}
478
479impl ScanResult {
480 /// Create a new `ScanResult` with the given execution plan.
481 ///
482 /// # Arguments
483 /// * `plan` - The execution plan that will perform the table scan
484 pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
485 Self { plan }
486 }
487
488 /// Get a reference to the execution plan for this scan result.
489 ///
490 /// Returns a reference to the [`ExecutionPlan`] that will perform
491 /// the actual table scanning and data retrieval.
492 pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
493 &self.plan
494 }
495
496 /// Consume this ScanResult and return the execution plan.
497 ///
498 /// Returns the owned [`ExecutionPlan`] that will perform
499 /// the actual table scanning and data retrieval.
500 pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
501 self.plan
502 }
503}
504
505impl From<Arc<dyn ExecutionPlan>> for ScanResult {
506 fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
507 Self::new(plan)
508 }
509}
510
511/// A factory which creates [`TableProvider`]s at runtime given a URL.
512///
513/// For example, this can be used to create a table "on the fly"
514/// from a directory of files only when that name is referenced.
515#[async_trait]
516pub trait TableProviderFactory: Debug + Sync + Send {
517 /// Create a TableProvider with the given url
518 async fn create(
519 &self,
520 state: &dyn Session,
521 cmd: &CreateExternalTable,
522 ) -> Result<Arc<dyn TableProvider>>;
523}
524
525/// Describes arguments provided to the table function call.
526pub struct TableFunctionArgs<'e, 's> {
527 /// Call arguments.
528 exprs: &'e [Expr],
529 /// Session within which the function is called.
530 session: &'s dyn Session,
531}
532
533impl<'e, 's> TableFunctionArgs<'e, 's> {
534 /// Make a new [`TableFunctionArgs`].
535 pub fn new(exprs: &'e [Expr], session: &'s dyn Session) -> Self {
536 Self { exprs, session }
537 }
538
539 /// Get expressions passed as the called function arguments.
540 pub fn exprs(&self) -> &'e [Expr] {
541 self.exprs
542 }
543
544 /// Get a session where the table function is called.
545 pub fn session(&self) -> &'s dyn Session {
546 self.session
547 }
548}
549
550/// A trait for table function implementations
551pub trait TableFunctionImpl: Debug + Sync + Send + Any {
552 /// Create a table provider
553 #[deprecated(
554 since = "53.0.0",
555 note = "Implement `TableFunctionImpl::call_with_args` instead"
556 )]
557 fn call(&self, _exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
558 internal_err!(
559 "TableFunctionImpl::call is not implemented. Implement TableFunctionImpl::call_with_args instead."
560 )
561 }
562
563 /// Create a table provider
564 fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
565 #[expect(deprecated)]
566 self.call(args.exprs)
567 }
568}
569
570/// A table that uses a function to generate data
571#[derive(Clone, Debug)]
572pub struct TableFunction {
573 /// Name of the table function
574 name: String,
575 /// Function implementation
576 fun: Arc<dyn TableFunctionImpl>,
577}
578
579impl TableFunction {
580 /// Create a new table function
581 pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
582 Self { name, fun }
583 }
584
585 /// Get the name of the table function
586 pub fn name(&self) -> &str {
587 &self.name
588 }
589
590 /// Get the implementation of the table function
591 pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
592 &self.fun
593 }
594
595 /// Get the function implementation and generate a table
596 #[deprecated(
597 since = "53.0.0",
598 note = "Use `TableFunction::create_table_provider_with_args` instead"
599 )]
600 pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
601 #[expect(deprecated)]
602 self.fun.call(args)
603 }
604
605 /// Get the function implementation and generate a table
606 pub fn create_table_provider_with_args(
607 &self,
608 args: TableFunctionArgs,
609 ) -> Result<Arc<dyn TableProvider>> {
610 self.fun.call_with_args(args)
611 }
612}