Skip to main content

datafusion_python/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::datatypes::SchemaRef;
22use arrow::pyarrow::ToPyArrow;
23use async_trait::async_trait;
24use datafusion::catalog::Session;
25use datafusion::common::Column;
26use datafusion::datasource::{TableProvider, TableType};
27use datafusion::logical_expr::{Expr, LogicalPlanBuilder, TableProviderFilterPushDown};
28use datafusion::physical_plan::ExecutionPlan;
29use datafusion::prelude::DataFrame;
30use pyo3::IntoPyObjectExt;
31use pyo3::prelude::*;
32
33use crate::context::PySessionContext;
34use crate::dataframe::PyDataFrame;
35use crate::dataset::Dataset;
36use crate::utils::table_provider_from_pycapsule;
37
38/// This struct is used as a common method for all TableProviders,
39/// whether they refer to an FFI provider, an internally known
40/// implementation, a dataset, or a dataframe view.
41#[pyclass(frozen, name = "RawTable", module = "datafusion.catalog", subclass)]
42#[derive(Clone)]
43pub struct PyTable {
44    pub table: Arc<dyn TableProvider>,
45}
46
47impl PyTable {
48    pub fn table(&self) -> Arc<dyn TableProvider> {
49        self.table.clone()
50    }
51}
52
53#[pymethods]
54impl PyTable {
55    /// Instantiate from any Python object that supports any of the table
56    /// types. We do not know a priori when using this method if the object
57    /// will be passed a wrapped or raw class. Here we handle all of the
58    /// following object types:
59    ///
60    /// - PyTable (essentially a clone operation), but either raw or wrapped
61    /// - DataFrame, either raw or wrapped
62    /// - FFI Table Providers via PyCapsule
63    /// - PyArrow Dataset objects
64    #[new]
65    pub fn new(obj: Bound<'_, PyAny>, session: Option<Bound<PyAny>>) -> PyResult<Self> {
66        let py = obj.py();
67        if let Ok(py_table) = obj.extract::<PyTable>() {
68            Ok(py_table)
69        } else if let Ok(py_table) = obj
70            .getattr("_inner")
71            .and_then(|inner| inner.extract::<PyTable>())
72        {
73            Ok(py_table)
74        } else if let Ok(py_df) = obj.extract::<PyDataFrame>() {
75            let provider = py_df.inner_df().as_ref().clone().into_view();
76            Ok(PyTable::from(provider))
77        } else if let Ok(py_df) = obj
78            .getattr("df")
79            .and_then(|inner| inner.extract::<PyDataFrame>())
80        {
81            let provider = py_df.inner_df().as_ref().clone().into_view();
82            Ok(PyTable::from(provider))
83        } else if let Some(provider) = {
84            let session = match session {
85                Some(session) => session,
86                None => PySessionContext::global_ctx()?.into_bound_py_any(obj.py())?,
87            };
88            table_provider_from_pycapsule(obj.clone(), session)?
89        } {
90            Ok(PyTable::from(provider))
91        } else {
92            let provider = Arc::new(Dataset::new(&obj, py)?) as Arc<dyn TableProvider>;
93            Ok(PyTable::from(provider))
94        }
95    }
96
97    /// Get a reference to the schema for this table
98    #[getter]
99    fn schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
100        self.table.schema().to_pyarrow(py)
101    }
102
103    /// Get the type of this table for metadata/catalog purposes.
104    #[getter]
105    fn kind(&self) -> &str {
106        match self.table.table_type() {
107            TableType::Base => "physical",
108            TableType::View => "view",
109            TableType::Temporary => "temporary",
110        }
111    }
112
113    fn __repr__(&self) -> PyResult<String> {
114        let kind = self.kind();
115        Ok(format!("Table(kind={kind})"))
116    }
117}
118
119impl From<Arc<dyn TableProvider>> for PyTable {
120    fn from(table: Arc<dyn TableProvider>) -> Self {
121        Self { table }
122    }
123}
124
125#[derive(Clone, Debug)]
126pub(crate) struct TempViewTable {
127    df: Arc<DataFrame>,
128}
129
130/// This is nearly identical to `DataFrameTableProvider`
131/// except that it is for temporary tables.
132/// Remove when https://github.com/apache/datafusion/issues/18026
133/// closes.
134impl TempViewTable {
135    pub(crate) fn new(df: Arc<DataFrame>) -> Self {
136        Self { df }
137    }
138}
139
140#[async_trait]
141impl TableProvider for TempViewTable {
142    fn as_any(&self) -> &dyn Any {
143        self
144    }
145
146    fn schema(&self) -> SchemaRef {
147        Arc::new(self.df.schema().as_arrow().clone())
148    }
149
150    fn table_type(&self) -> TableType {
151        TableType::Temporary
152    }
153
154    async fn scan(
155        &self,
156        state: &dyn Session,
157        projection: Option<&Vec<usize>>,
158        filters: &[Expr],
159        limit: Option<usize>,
160    ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
161        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
162        let plan = self.df.logical_plan().clone();
163        let mut plan = LogicalPlanBuilder::from(plan);
164
165        if let Some(filter) = filter {
166            plan = plan.filter(filter)?;
167        }
168
169        let mut plan = if let Some(projection) = projection {
170            // avoiding adding a redundant projection (e.g. SELECT * FROM view)
171            let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
172            if projection == &current_projection {
173                plan
174            } else {
175                let fields: Vec<Expr> = projection
176                    .iter()
177                    .map(|i| {
178                        Expr::Column(Column::from(
179                            self.df.logical_plan().schema().qualified_field(*i),
180                        ))
181                    })
182                    .collect();
183                plan.project(fields)?
184            }
185        } else {
186            plan
187        };
188
189        if let Some(limit) = limit {
190            plan = plan.limit(0, Some(limit))?;
191        }
192
193        state.create_physical_plan(&plan.build()?).await
194    }
195
196    fn supports_filters_pushdown(
197        &self,
198        filters: &[&Expr],
199    ) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
200        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
201    }
202}