datafusion_catalog/table.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::borrow::Cow;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::session::Session;
24use arrow::datatypes::SchemaRef;
25use async_trait::async_trait;
26use datafusion_common::Result;
27use datafusion_common::{not_impl_err, Constraints, Statistics};
28use datafusion_expr::Expr;
29
30use datafusion_expr::dml::InsertOp;
31use datafusion_expr::{
32 CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
33};
34use datafusion_physical_plan::ExecutionPlan;
35
36/// A table which can be queried and modified.
37///
38/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
39///
40/// [`TableProvider`] represents a source of data which can provide data as
41/// Apache Arrow [`RecordBatch`]es. Implementations of this trait provide
42/// important information for planning such as:
43///
44/// 1. [`Self::schema`]: The schema (columns and their types) of the table
45/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
46/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
47///
48/// [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
49/// [`CatalogProvider`]: super::CatalogProvider
50#[async_trait]
51pub trait TableProvider: Debug + Sync + Send {
52 /// Returns the table provider as [`Any`] so that it can be
53 /// downcast to a specific implementation.
54 fn as_any(&self) -> &dyn Any;
55
56 /// Get a reference to the schema for this table
57 fn schema(&self) -> SchemaRef;
58
59 /// Get a reference to the constraints of the table.
60 /// Returns:
61 /// - `None` for tables that do not support constraints.
62 /// - `Some(&Constraints)` for tables supporting constraints.
63 /// Therefore, a `Some(&Constraints::empty())` return value indicates that
64 /// this table supports constraints, but there are no constraints.
65 fn constraints(&self) -> Option<&Constraints> {
66 None
67 }
68
69 /// Get the type of this table for metadata/catalog purposes.
70 fn table_type(&self) -> TableType;
71
72 /// Get the create statement used to create this table, if available.
73 fn get_table_definition(&self) -> Option<&str> {
74 None
75 }
76
77 /// Get the [`LogicalPlan`] of this table, if available.
78 fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
79 None
80 }
81
82 /// Get the default value for a column, if available.
83 fn get_column_default(&self, _column: &str) -> Option<&Expr> {
84 None
85 }
86
87 /// Create an [`ExecutionPlan`] for scanning the table with optionally
88 /// specified `projection`, `filter` and `limit`, described below.
89 ///
90 /// The `ExecutionPlan` is responsible scanning the datasource's
91 /// partitions in a streaming, parallelized fashion.
92 ///
93 /// # Projection
94 ///
95 /// If specified, only a subset of columns should be returned, in the order
96 /// specified. The projection is a set of indexes of the fields in
97 /// [`Self::schema`].
98 ///
99 /// DataFusion provides the projection to scan only the columns actually
100 /// used in the query to improve performance, an optimization called
101 /// "Projection Pushdown". Some datasources, such as Parquet, can use this
102 /// information to go significantly faster when only a subset of columns is
103 /// required.
104 ///
105 /// # Filters
106 ///
107 /// A list of boolean filter [`Expr`]s to evaluate *during* the scan, in the
108 /// manner specified by [`Self::supports_filters_pushdown`]. Only rows for
109 /// which *all* of the `Expr`s evaluate to `true` must be returned (aka the
110 /// expressions are `AND`ed together).
111 ///
112 /// To enable filter pushdown you must override
113 /// [`Self::supports_filters_pushdown`] as the default implementation does
114 /// not and `filters` will be empty.
115 ///
116 /// DataFusion pushes filtering into the scans whenever possible
117 /// ("Filter Pushdown"), and depending on the format and the
118 /// implementation of the format, evaluating the predicate during the scan
119 /// can increase performance significantly.
120 ///
121 /// ## Note: Some columns may appear *only* in Filters
122 ///
123 /// In certain cases, a query may only use a certain column in a Filter that
124 /// has been completely pushed down to the scan. In this case, the
125 /// projection will not contain all the columns found in the filter
126 /// expressions.
127 ///
128 /// For example, given the query `SELECT t.a FROM t WHERE t.b > 5`,
129 ///
130 /// ```text
131 /// ┌────────────────────┐
132 /// │ Projection(t.a) │
133 /// └────────────────────┘
134 /// ▲
135 /// │
136 /// │
137 /// ┌────────────────────┐ Filter ┌────────────────────┐ Projection ┌────────────────────┐
138 /// │ Filter(t.b > 5) │────Pushdown──▶ │ Projection(t.a) │ ───Pushdown───▶ │ Projection(t.a) │
139 /// └────────────────────┘ └────────────────────┘ └────────────────────┘
140 /// ▲ ▲ ▲
141 /// │ │ │
142 /// │ │ ┌────────────────────┐
143 /// ┌────────────────────┐ ┌────────────────────┐ │ Scan │
144 /// │ Scan │ │ Scan │ │ filter=(t.b > 5) │
145 /// └────────────────────┘ │ filter=(t.b > 5) │ │ projection=(t.a) │
146 /// └────────────────────┘ └────────────────────┘
147 ///
148 /// Initial Plan If `TableProviderFilterPushDown` Projection pushdown notes that
149 /// returns true, filter pushdown the scan only needs t.a
150 /// pushes the filter into the scan
151 /// BUT internally evaluating the
152 /// predicate still requires t.b
153 /// ```
154 ///
155 /// # Limit
156 ///
157 /// If `limit` is specified, must only produce *at least* this many rows,
158 /// (though it may return more). Like Projection Pushdown and Filter
159 /// Pushdown, DataFusion pushes `LIMIT`s as far down in the plan as
160 /// possible, called "Limit Pushdown" as some sources can use this
161 /// information to improve their performance. Note that if there are any
162 /// Inexact filters pushed down, the LIMIT cannot be pushed down. This is
163 /// because inexact filters do not guarantee that every filtered row is
164 /// removed, so applying the limit could lead to too few rows being available
165 /// to return as a final result.
166 async fn scan(
167 &self,
168 state: &dyn Session,
169 projection: Option<&Vec<usize>>,
170 filters: &[Expr],
171 limit: Option<usize>,
172 ) -> Result<Arc<dyn ExecutionPlan>>;
173
174 /// Create an [`ExecutionPlan`] for scanning the table using structured arguments.
175 ///
176 /// This method uses [`ScanArgs`] to pass scan parameters in a structured way
177 /// and returns a [`ScanResult`] containing the execution plan.
178 ///
179 /// Table providers can override this method to take advantage of additional
180 /// parameters like the upcoming `preferred_ordering` that may not be available through
181 /// other scan methods.
182 ///
183 /// # Arguments
184 /// * `state` - The session state containing configuration and context
185 /// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences
186 ///
187 /// # Returns
188 /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table
189 ///
190 /// See [`Self::scan`] for detailed documentation about projection, filters, and limits.
191 async fn scan_with_args<'a>(
192 &self,
193 state: &dyn Session,
194 args: ScanArgs<'a>,
195 ) -> Result<ScanResult> {
196 let filters = args.filters().unwrap_or(&[]);
197 let projection = args.projection().map(|p| p.to_vec());
198 let limit = args.limit();
199 let plan = self
200 .scan(state, projection.as_ref(), filters, limit)
201 .await?;
202 Ok(plan.into())
203 }
204
205 /// Specify if DataFusion should provide filter expressions to the
206 /// TableProvider to apply *during* the scan.
207 ///
208 /// Some TableProviders can evaluate filters more efficiently than the
209 /// `Filter` operator in DataFusion, for example by using an index.
210 ///
211 /// # Parameters and Return Value
212 ///
213 /// The return `Vec` must have one element for each element of the `filters`
214 /// argument. The value of each element indicates if the TableProvider can
215 /// apply the corresponding filter during the scan. The position in the return
216 /// value corresponds to the expression in the `filters` parameter.
217 ///
218 /// If the length of the resulting `Vec` does not match the `filters` input
219 /// an error will be thrown.
220 ///
221 /// Each element in the resulting `Vec` is one of the following:
222 /// * [`Exact`] or [`Inexact`]: The TableProvider can apply the filter
223 /// during scan
224 /// * [`Unsupported`]: The TableProvider cannot apply the filter during scan
225 ///
226 /// By default, this function returns [`Unsupported`] for all filters,
227 /// meaning no filters will be provided to [`Self::scan`].
228 ///
229 /// [`Unsupported`]: TableProviderFilterPushDown::Unsupported
230 /// [`Exact`]: TableProviderFilterPushDown::Exact
231 /// [`Inexact`]: TableProviderFilterPushDown::Inexact
232 /// # Example
233 ///
234 /// ```rust
235 /// # use std::any::Any;
236 /// # use std::sync::Arc;
237 /// # use arrow::datatypes::SchemaRef;
238 /// # use async_trait::async_trait;
239 /// # use datafusion_catalog::{TableProvider, Session};
240 /// # use datafusion_common::Result;
241 /// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
242 /// # use datafusion_physical_plan::ExecutionPlan;
243 /// // Define a struct that implements the TableProvider trait
244 /// #[derive(Debug)]
245 /// struct TestDataSource {}
246 ///
247 /// #[async_trait]
248 /// impl TableProvider for TestDataSource {
249 /// # fn as_any(&self) -> &dyn Any { todo!() }
250 /// # fn schema(&self) -> SchemaRef { todo!() }
251 /// # fn table_type(&self) -> TableType { todo!() }
252 /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec<usize>>, f: &[Expr], l: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> {
253 /// todo!()
254 /// # }
255 /// // Override the supports_filters_pushdown to evaluate which expressions
256 /// // to accept as pushdown predicates.
257 /// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
258 /// // Process each filter
259 /// let support: Vec<_> = filters.iter().map(|expr| {
260 /// match expr {
261 /// // This example only supports a between expr with a single column named "c1".
262 /// Expr::Between(between_expr) => {
263 /// between_expr.expr
264 /// .try_as_col()
265 /// .map(|column| {
266 /// if column.name == "c1" {
267 /// TableProviderFilterPushDown::Exact
268 /// } else {
269 /// TableProviderFilterPushDown::Unsupported
270 /// }
271 /// })
272 /// // If there is no column in the expr set the filter to unsupported.
273 /// .unwrap_or(TableProviderFilterPushDown::Unsupported)
274 /// }
275 /// _ => {
276 /// // For all other cases return Unsupported.
277 /// TableProviderFilterPushDown::Unsupported
278 /// }
279 /// }
280 /// }).collect();
281 /// Ok(support)
282 /// }
283 /// }
284 /// ```
285 fn supports_filters_pushdown(
286 &self,
287 filters: &[&Expr],
288 ) -> Result<Vec<TableProviderFilterPushDown>> {
289 Ok(vec![
290 TableProviderFilterPushDown::Unsupported;
291 filters.len()
292 ])
293 }
294
295 /// Get statistics for this table, if available
296 /// Although not presently used in mainline DataFusion, this allows implementation specific
297 /// behavior for downstream repositories, in conjunction with specialized optimizer rules to
298 /// perform operations such as re-ordering of joins.
299 fn statistics(&self) -> Option<Statistics> {
300 None
301 }
302
303 /// Return an [`ExecutionPlan`] to insert data into this table, if
304 /// supported.
305 ///
306 /// The returned plan should return a single row in a UInt64
307 /// column called "count" such as the following
308 ///
309 /// ```text
310 /// +-------+,
311 /// | count |,
312 /// +-------+,
313 /// | 6 |,
314 /// +-------+,
315 /// ```
316 ///
317 /// # See Also
318 ///
319 /// See [`DataSinkExec`] for the common pattern of inserting a
320 /// streams of `RecordBatch`es as files to an ObjectStore.
321 ///
322 /// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
323 async fn insert_into(
324 &self,
325 _state: &dyn Session,
326 _input: Arc<dyn ExecutionPlan>,
327 _insert_op: InsertOp,
328 ) -> Result<Arc<dyn ExecutionPlan>> {
329 not_impl_err!("Insert into not implemented for this table")
330 }
331}
332
333/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
334#[derive(Debug, Clone, Default)]
335pub struct ScanArgs<'a> {
336 filters: Option<&'a [Expr]>,
337 projection: Option<&'a [usize]>,
338 limit: Option<usize>,
339}
340
341impl<'a> ScanArgs<'a> {
342 /// Set the column projection for the scan.
343 ///
344 /// The projection is a list of column indices from [`TableProvider::schema`]
345 /// that should be included in the scan results. If `None`, all columns are included.
346 ///
347 /// # Arguments
348 /// * `projection` - Optional slice of column indices to project
349 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
350 self.projection = projection;
351 self
352 }
353
354 /// Get the column projection for the scan.
355 ///
356 /// Returns a reference to the projection column indices, or `None` if
357 /// no projection was specified (meaning all columns should be included).
358 pub fn projection(&self) -> Option<&'a [usize]> {
359 self.projection
360 }
361
362 /// Set the filter expressions for the scan.
363 ///
364 /// Filters are boolean expressions that should be evaluated during the scan
365 /// to reduce the number of rows returned. All expressions are combined with AND logic.
366 /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`].
367 ///
368 /// # Arguments
369 /// * `filters` - Optional slice of filter expressions
370 pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
371 self.filters = filters;
372 self
373 }
374
375 /// Get the filter expressions for the scan.
376 ///
377 /// Returns a reference to the filter expressions, or `None` if no filters were specified.
378 pub fn filters(&self) -> Option<&'a [Expr]> {
379 self.filters
380 }
381
382 /// Set the maximum number of rows to return from the scan.
383 ///
384 /// If specified, the scan should return at most this many rows. This is typically
385 /// used to optimize queries with `LIMIT` clauses.
386 ///
387 /// # Arguments
388 /// * `limit` - Optional maximum number of rows to return
389 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
390 self.limit = limit;
391 self
392 }
393
394 /// Get the maximum number of rows to return from the scan.
395 ///
396 /// Returns the row limit, or `None` if no limit was specified.
397 pub fn limit(&self) -> Option<usize> {
398 self.limit
399 }
400}
401
402/// Result of a table scan operation from [`TableProvider::scan_with_args`].
403#[derive(Debug, Clone)]
404pub struct ScanResult {
405 /// The ExecutionPlan to run.
406 plan: Arc<dyn ExecutionPlan>,
407}
408
409impl ScanResult {
410 /// Create a new `ScanResult` with the given execution plan.
411 ///
412 /// # Arguments
413 /// * `plan` - The execution plan that will perform the table scan
414 pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
415 Self { plan }
416 }
417
418 /// Get a reference to the execution plan for this scan result.
419 ///
420 /// Returns a reference to the [`ExecutionPlan`] that will perform
421 /// the actual table scanning and data retrieval.
422 pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
423 &self.plan
424 }
425
426 /// Consume this ScanResult and return the execution plan.
427 ///
428 /// Returns the owned [`ExecutionPlan`] that will perform
429 /// the actual table scanning and data retrieval.
430 pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
431 self.plan
432 }
433}
434
435impl From<Arc<dyn ExecutionPlan>> for ScanResult {
436 fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
437 Self::new(plan)
438 }
439}
440
441/// A factory which creates [`TableProvider`]s at runtime given a URL.
442///
443/// For example, this can be used to create a table "on the fly"
444/// from a directory of files only when that name is referenced.
445#[async_trait]
446pub trait TableProviderFactory: Debug + Sync + Send {
447 /// Create a TableProvider with the given url
448 async fn create(
449 &self,
450 state: &dyn Session,
451 cmd: &CreateExternalTable,
452 ) -> Result<Arc<dyn TableProvider>>;
453}
454
455/// A trait for table function implementations
456pub trait TableFunctionImpl: Debug + Sync + Send {
457 /// Create a table provider
458 fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
459}
460
461/// A table that uses a function to generate data
462#[derive(Clone, Debug)]
463pub struct TableFunction {
464 /// Name of the table function
465 name: String,
466 /// Function implementation
467 fun: Arc<dyn TableFunctionImpl>,
468}
469
470impl TableFunction {
471 /// Create a new table function
472 pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
473 Self { name, fun }
474 }
475
476 /// Get the name of the table function
477 pub fn name(&self) -> &str {
478 &self.name
479 }
480
481 /// Get the implementation of the table function
482 pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
483 &self.fun
484 }
485
486 /// Get the function implementation and generate a table
487 pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
488 self.fun.call(args)
489 }
490}