Skip to main content

datafusion_python/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::datatypes::SchemaRef;
22use arrow::pyarrow::ToPyArrow;
23use async_trait::async_trait;
24use datafusion::catalog::{Session, TableProviderFactory};
25use datafusion::common::Column;
26use datafusion::datasource::{TableProvider, TableType};
27use datafusion::logical_expr::{
28    CreateExternalTable, Expr, LogicalPlanBuilder, TableProviderFilterPushDown,
29};
30use datafusion::physical_plan::ExecutionPlan;
31use datafusion::prelude::DataFrame;
32use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
33use datafusion_python_util::{create_logical_extension_capsule, table_provider_from_pycapsule};
34use pyo3::IntoPyObjectExt;
35use pyo3::prelude::*;
36
37use crate::context::PySessionContext;
38use crate::dataframe::PyDataFrame;
39use crate::dataset::Dataset;
40use crate::errors;
41use crate::expr::create_external_table::PyCreateExternalTable;
42
43/// This struct is used as a common method for all TableProviders,
44/// whether they refer to an FFI provider, an internally known
45/// implementation, a dataset, or a dataframe view.
46#[pyclass(
47    from_py_object,
48    frozen,
49    name = "RawTable",
50    module = "datafusion.catalog",
51    subclass
52)]
53#[derive(Clone)]
54pub struct PyTable {
55    pub table: Arc<dyn TableProvider>,
56}
57
58impl PyTable {
59    pub fn table(&self) -> Arc<dyn TableProvider> {
60        self.table.clone()
61    }
62}
63
64#[pymethods]
65impl PyTable {
66    /// Instantiate from any Python object that supports any of the table
67    /// types. We do not know a priori when using this method if the object
68    /// will be passed a wrapped or raw class. Here we handle all of the
69    /// following object types:
70    ///
71    /// - PyTable (essentially a clone operation), but either raw or wrapped
72    /// - DataFrame, either raw or wrapped
73    /// - FFI Table Providers via PyCapsule
74    /// - PyArrow Dataset objects
75    #[new]
76    pub fn new(obj: Bound<'_, PyAny>, session: Option<Bound<PyAny>>) -> PyResult<Self> {
77        let py = obj.py();
78        if let Ok(py_table) = obj.extract::<PyTable>() {
79            Ok(py_table)
80        } else if let Ok(py_table) = obj
81            .getattr("_inner")
82            .and_then(|inner| inner.extract::<PyTable>().map_err(Into::<PyErr>::into))
83        {
84            Ok(py_table)
85        } else if let Ok(py_df) = obj.extract::<PyDataFrame>() {
86            let provider = py_df.inner_df().as_ref().clone().into_view();
87            Ok(PyTable::from(provider))
88        } else if let Ok(py_df) = obj
89            .getattr("df")
90            .and_then(|inner| inner.extract::<PyDataFrame>().map_err(Into::<PyErr>::into))
91        {
92            let provider = py_df.inner_df().as_ref().clone().into_view();
93            Ok(PyTable::from(provider))
94        } else if let Some(provider) = {
95            let session = match session {
96                Some(session) => session,
97                None => PySessionContext::global_ctx()?.into_bound_py_any(obj.py())?,
98            };
99            table_provider_from_pycapsule(obj.clone(), session)?
100        } {
101            Ok(PyTable::from(provider))
102        } else {
103            let provider = Arc::new(Dataset::new(&obj, py)?) as Arc<dyn TableProvider>;
104            Ok(PyTable::from(provider))
105        }
106    }
107
108    /// Get a reference to the schema for this table
109    #[getter]
110    fn schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
111        self.table.schema().to_pyarrow(py)
112    }
113
114    /// Get the type of this table for metadata/catalog purposes.
115    #[getter]
116    fn kind(&self) -> &str {
117        match self.table.table_type() {
118            TableType::Base => "physical",
119            TableType::View => "view",
120            TableType::Temporary => "temporary",
121        }
122    }
123
124    fn __repr__(&self) -> PyResult<String> {
125        let kind = self.kind();
126        Ok(format!("Table(kind={kind})"))
127    }
128}
129
130impl From<Arc<dyn TableProvider>> for PyTable {
131    fn from(table: Arc<dyn TableProvider>) -> Self {
132        Self { table }
133    }
134}
135
136#[derive(Clone, Debug)]
137pub(crate) struct TempViewTable {
138    df: Arc<DataFrame>,
139}
140
141/// This is nearly identical to `DataFrameTableProvider`
142/// except that it is for temporary tables.
143/// Remove when https://github.com/apache/datafusion/issues/18026
144/// closes.
145impl TempViewTable {
146    pub(crate) fn new(df: Arc<DataFrame>) -> Self {
147        Self { df }
148    }
149}
150
151#[async_trait]
152impl TableProvider for TempViewTable {
153    fn as_any(&self) -> &dyn Any {
154        self
155    }
156
157    fn schema(&self) -> SchemaRef {
158        Arc::new(self.df.schema().as_arrow().clone())
159    }
160
161    fn table_type(&self) -> TableType {
162        TableType::Temporary
163    }
164
165    async fn scan(
166        &self,
167        state: &dyn Session,
168        projection: Option<&Vec<usize>>,
169        filters: &[Expr],
170        limit: Option<usize>,
171    ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
172        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
173        let plan = self.df.logical_plan().clone();
174        let mut plan = LogicalPlanBuilder::from(plan);
175
176        if let Some(filter) = filter {
177            plan = plan.filter(filter)?;
178        }
179
180        let mut plan = if let Some(projection) = projection {
181            // avoiding adding a redundant projection (e.g. SELECT * FROM view)
182            let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
183            if projection == &current_projection {
184                plan
185            } else {
186                let fields: Vec<Expr> = projection
187                    .iter()
188                    .map(|i| {
189                        Expr::Column(Column::from(
190                            self.df.logical_plan().schema().qualified_field(*i),
191                        ))
192                    })
193                    .collect();
194                plan.project(fields)?
195            }
196        } else {
197            plan
198        };
199
200        if let Some(limit) = limit {
201            plan = plan.limit(0, Some(limit))?;
202        }
203
204        state.create_physical_plan(&plan.build()?).await
205    }
206
207    fn supports_filters_pushdown(
208        &self,
209        filters: &[&Expr],
210    ) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
211        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
212    }
213}
214
215#[derive(Debug)]
216pub(crate) struct RustWrappedPyTableProviderFactory {
217    pub(crate) table_provider_factory: Py<PyAny>,
218    pub(crate) codec: Arc<FFI_LogicalExtensionCodec>,
219}
220
221impl RustWrappedPyTableProviderFactory {
222    pub fn new(table_provider_factory: Py<PyAny>, codec: Arc<FFI_LogicalExtensionCodec>) -> Self {
223        Self {
224            table_provider_factory,
225            codec,
226        }
227    }
228
229    fn create_inner(
230        &self,
231        cmd: CreateExternalTable,
232        codec: Bound<PyAny>,
233    ) -> PyResult<Arc<dyn TableProvider>> {
234        Python::attach(|py| {
235            let provider = self.table_provider_factory.bind(py);
236            let cmd = PyCreateExternalTable::from(cmd);
237
238            provider
239                .call_method1("create", (cmd,))
240                .and_then(|t| PyTable::new(t, Some(codec)))
241                .map(|t| t.table())
242        })
243    }
244}
245
246#[async_trait]
247impl TableProviderFactory for RustWrappedPyTableProviderFactory {
248    async fn create(
249        &self,
250        _: &dyn Session,
251        cmd: &CreateExternalTable,
252    ) -> datafusion::common::Result<Arc<dyn TableProvider>> {
253        Python::attach(|py| {
254            let codec = create_logical_extension_capsule(py, self.codec.as_ref())
255                .map_err(errors::to_datafusion_err)?;
256
257            self.create_inner(cmd.clone(), codec.into_any())
258                .map_err(errors::to_datafusion_err)
259        })
260    }
261}