datafusion_python/
table.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::datatypes::SchemaRef;
22use arrow::pyarrow::ToPyArrow;
23use async_trait::async_trait;
24use datafusion::catalog::Session;
25use datafusion::common::Column;
26use datafusion::datasource::{TableProvider, TableType};
27use datafusion::logical_expr::{Expr, LogicalPlanBuilder, TableProviderFilterPushDown};
28use datafusion::physical_plan::ExecutionPlan;
29use datafusion::prelude::DataFrame;
30use pyo3::IntoPyObjectExt;
31use pyo3::prelude::*;
32
33use crate::context::PySessionContext;
34use crate::dataframe::PyDataFrame;
35use crate::dataset::Dataset;
36use crate::utils::table_provider_from_pycapsule;
37
38#[pyclass(frozen, name = "RawTable", module = "datafusion.catalog", subclass)]
42#[derive(Clone)]
43pub struct PyTable {
44 pub table: Arc<dyn TableProvider>,
45}
46
47impl PyTable {
48 pub fn table(&self) -> Arc<dyn TableProvider> {
49 self.table.clone()
50 }
51}
52
53#[pymethods]
54impl PyTable {
55 #[new]
65 pub fn new(obj: Bound<'_, PyAny>, session: Option<Bound<PyAny>>) -> PyResult<Self> {
66 let py = obj.py();
67 if let Ok(py_table) = obj.extract::<PyTable>() {
68 Ok(py_table)
69 } else if let Ok(py_table) = obj
70 .getattr("_inner")
71 .and_then(|inner| inner.extract::<PyTable>())
72 {
73 Ok(py_table)
74 } else if let Ok(py_df) = obj.extract::<PyDataFrame>() {
75 let provider = py_df.inner_df().as_ref().clone().into_view();
76 Ok(PyTable::from(provider))
77 } else if let Ok(py_df) = obj
78 .getattr("df")
79 .and_then(|inner| inner.extract::<PyDataFrame>())
80 {
81 let provider = py_df.inner_df().as_ref().clone().into_view();
82 Ok(PyTable::from(provider))
83 } else if let Some(provider) = {
84 let session = match session {
85 Some(session) => session,
86 None => PySessionContext::global_ctx()?.into_bound_py_any(obj.py())?,
87 };
88 table_provider_from_pycapsule(obj.clone(), session)?
89 } {
90 Ok(PyTable::from(provider))
91 } else {
92 let provider = Arc::new(Dataset::new(&obj, py)?) as Arc<dyn TableProvider>;
93 Ok(PyTable::from(provider))
94 }
95 }
96
97 #[getter]
99 fn schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
100 self.table.schema().to_pyarrow(py)
101 }
102
103 #[getter]
105 fn kind(&self) -> &str {
106 match self.table.table_type() {
107 TableType::Base => "physical",
108 TableType::View => "view",
109 TableType::Temporary => "temporary",
110 }
111 }
112
113 fn __repr__(&self) -> PyResult<String> {
114 let kind = self.kind();
115 Ok(format!("Table(kind={kind})"))
116 }
117}
118
119impl From<Arc<dyn TableProvider>> for PyTable {
120 fn from(table: Arc<dyn TableProvider>) -> Self {
121 Self { table }
122 }
123}
124
125#[derive(Clone, Debug)]
126pub(crate) struct TempViewTable {
127 df: Arc<DataFrame>,
128}
129
130impl TempViewTable {
135 pub(crate) fn new(df: Arc<DataFrame>) -> Self {
136 Self { df }
137 }
138}
139
140#[async_trait]
141impl TableProvider for TempViewTable {
142 fn as_any(&self) -> &dyn Any {
143 self
144 }
145
146 fn schema(&self) -> SchemaRef {
147 Arc::new(self.df.schema().as_arrow().clone())
148 }
149
150 fn table_type(&self) -> TableType {
151 TableType::Temporary
152 }
153
154 async fn scan(
155 &self,
156 state: &dyn Session,
157 projection: Option<&Vec<usize>>,
158 filters: &[Expr],
159 limit: Option<usize>,
160 ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
161 let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
162 let plan = self.df.logical_plan().clone();
163 let mut plan = LogicalPlanBuilder::from(plan);
164
165 if let Some(filter) = filter {
166 plan = plan.filter(filter)?;
167 }
168
169 let mut plan = if let Some(projection) = projection {
170 let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
172 if projection == ¤t_projection {
173 plan
174 } else {
175 let fields: Vec<Expr> = projection
176 .iter()
177 .map(|i| {
178 Expr::Column(Column::from(
179 self.df.logical_plan().schema().qualified_field(*i),
180 ))
181 })
182 .collect();
183 plan.project(fields)?
184 }
185 } else {
186 plan
187 };
188
189 if let Some(limit) = limit {
190 plan = plan.limit(0, Some(limit))?;
191 }
192
193 state.create_physical_plan(&plan.build()?).await
194 }
195
196 fn supports_filters_pushdown(
197 &self,
198 filters: &[&Expr],
199 ) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
200 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
201 }
202}