datafusion_python/
table.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::datatypes::SchemaRef;
22use arrow::pyarrow::ToPyArrow;
23use async_trait::async_trait;
24use datafusion::catalog::{Session, TableProviderFactory};
25use datafusion::common::Column;
26use datafusion::datasource::{TableProvider, TableType};
27use datafusion::logical_expr::{
28 CreateExternalTable, Expr, LogicalPlanBuilder, TableProviderFilterPushDown,
29};
30use datafusion::physical_plan::ExecutionPlan;
31use datafusion::prelude::DataFrame;
32use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
33use datafusion_python_util::{create_logical_extension_capsule, table_provider_from_pycapsule};
34use pyo3::IntoPyObjectExt;
35use pyo3::prelude::*;
36
37use crate::context::PySessionContext;
38use crate::dataframe::PyDataFrame;
39use crate::dataset::Dataset;
40use crate::errors;
41use crate::expr::create_external_table::PyCreateExternalTable;
42
43#[pyclass(
47 from_py_object,
48 frozen,
49 name = "RawTable",
50 module = "datafusion.catalog",
51 subclass
52)]
53#[derive(Clone)]
54pub struct PyTable {
55 pub table: Arc<dyn TableProvider>,
56}
57
58impl PyTable {
59 pub fn table(&self) -> Arc<dyn TableProvider> {
60 self.table.clone()
61 }
62}
63
64#[pymethods]
65impl PyTable {
66 #[new]
76 pub fn new(obj: Bound<'_, PyAny>, session: Option<Bound<PyAny>>) -> PyResult<Self> {
77 let py = obj.py();
78 if let Ok(py_table) = obj.extract::<PyTable>() {
79 Ok(py_table)
80 } else if let Ok(py_table) = obj
81 .getattr("_inner")
82 .and_then(|inner| inner.extract::<PyTable>().map_err(Into::<PyErr>::into))
83 {
84 Ok(py_table)
85 } else if let Ok(py_df) = obj.extract::<PyDataFrame>() {
86 let provider = py_df.inner_df().as_ref().clone().into_view();
87 Ok(PyTable::from(provider))
88 } else if let Ok(py_df) = obj
89 .getattr("df")
90 .and_then(|inner| inner.extract::<PyDataFrame>().map_err(Into::<PyErr>::into))
91 {
92 let provider = py_df.inner_df().as_ref().clone().into_view();
93 Ok(PyTable::from(provider))
94 } else if let Some(provider) = {
95 let session = match session {
96 Some(session) => session,
97 None => PySessionContext::global_ctx()?.into_bound_py_any(obj.py())?,
98 };
99 table_provider_from_pycapsule(obj.clone(), session)?
100 } {
101 Ok(PyTable::from(provider))
102 } else {
103 let provider = Arc::new(Dataset::new(&obj, py)?) as Arc<dyn TableProvider>;
104 Ok(PyTable::from(provider))
105 }
106 }
107
108 #[getter]
110 fn schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
111 self.table.schema().to_pyarrow(py)
112 }
113
114 #[getter]
116 fn kind(&self) -> &str {
117 match self.table.table_type() {
118 TableType::Base => "physical",
119 TableType::View => "view",
120 TableType::Temporary => "temporary",
121 }
122 }
123
124 fn __repr__(&self) -> PyResult<String> {
125 let kind = self.kind();
126 Ok(format!("Table(kind={kind})"))
127 }
128}
129
130impl From<Arc<dyn TableProvider>> for PyTable {
131 fn from(table: Arc<dyn TableProvider>) -> Self {
132 Self { table }
133 }
134}
135
136#[derive(Clone, Debug)]
137pub(crate) struct TempViewTable {
138 df: Arc<DataFrame>,
139}
140
141impl TempViewTable {
146 pub(crate) fn new(df: Arc<DataFrame>) -> Self {
147 Self { df }
148 }
149}
150
151#[async_trait]
152impl TableProvider for TempViewTable {
153 fn as_any(&self) -> &dyn Any {
154 self
155 }
156
157 fn schema(&self) -> SchemaRef {
158 Arc::new(self.df.schema().as_arrow().clone())
159 }
160
161 fn table_type(&self) -> TableType {
162 TableType::Temporary
163 }
164
165 async fn scan(
166 &self,
167 state: &dyn Session,
168 projection: Option<&Vec<usize>>,
169 filters: &[Expr],
170 limit: Option<usize>,
171 ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
172 let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
173 let plan = self.df.logical_plan().clone();
174 let mut plan = LogicalPlanBuilder::from(plan);
175
176 if let Some(filter) = filter {
177 plan = plan.filter(filter)?;
178 }
179
180 let mut plan = if let Some(projection) = projection {
181 let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
183 if projection == ¤t_projection {
184 plan
185 } else {
186 let fields: Vec<Expr> = projection
187 .iter()
188 .map(|i| {
189 Expr::Column(Column::from(
190 self.df.logical_plan().schema().qualified_field(*i),
191 ))
192 })
193 .collect();
194 plan.project(fields)?
195 }
196 } else {
197 plan
198 };
199
200 if let Some(limit) = limit {
201 plan = plan.limit(0, Some(limit))?;
202 }
203
204 state.create_physical_plan(&plan.build()?).await
205 }
206
207 fn supports_filters_pushdown(
208 &self,
209 filters: &[&Expr],
210 ) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
211 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
212 }
213}
214
215#[derive(Debug)]
216pub(crate) struct RustWrappedPyTableProviderFactory {
217 pub(crate) table_provider_factory: Py<PyAny>,
218 pub(crate) codec: Arc<FFI_LogicalExtensionCodec>,
219}
220
221impl RustWrappedPyTableProviderFactory {
222 pub fn new(table_provider_factory: Py<PyAny>, codec: Arc<FFI_LogicalExtensionCodec>) -> Self {
223 Self {
224 table_provider_factory,
225 codec,
226 }
227 }
228
229 fn create_inner(
230 &self,
231 cmd: CreateExternalTable,
232 codec: Bound<PyAny>,
233 ) -> PyResult<Arc<dyn TableProvider>> {
234 Python::attach(|py| {
235 let provider = self.table_provider_factory.bind(py);
236 let cmd = PyCreateExternalTable::from(cmd);
237
238 provider
239 .call_method1("create", (cmd,))
240 .and_then(|t| PyTable::new(t, Some(codec)))
241 .map(|t| t.table())
242 })
243 }
244}
245
246#[async_trait]
247impl TableProviderFactory for RustWrappedPyTableProviderFactory {
248 async fn create(
249 &self,
250 _: &dyn Session,
251 cmd: &CreateExternalTable,
252 ) -> datafusion::common::Result<Arc<dyn TableProvider>> {
253 Python::attach(|py| {
254 let codec = create_logical_extension_capsule(py, self.codec.as_ref())
255 .map_err(errors::to_datafusion_err)?;
256
257 self.create_inner(cmd.clone(), codec.into_any())
258 .map_err(errors::to_datafusion_err)
259 })
260 }
261}