datafusion_python/expr/
table_scan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::common::TableReference;
19use datafusion::logical_expr::logical_plan::TableScan;
20use pyo3::{prelude::*, IntoPyObjectExt};
21use std::fmt::{self, Display, Formatter};
22
23use crate::expr::logical_node::LogicalNode;
24use crate::sql::logical::PyLogicalPlan;
25use crate::{common::df_schema::PyDFSchema, expr::PyExpr};
26
27#[pyclass(name = "TableScan", module = "datafusion.expr", subclass)]
28#[derive(Clone)]
29pub struct PyTableScan {
30    table_scan: TableScan,
31}
32
33impl PyTableScan {
34    pub fn new(table_scan: TableScan) -> Self {
35        Self { table_scan }
36    }
37}
38
39impl From<PyTableScan> for TableScan {
40    fn from(tbl_scan: PyTableScan) -> TableScan {
41        tbl_scan.table_scan
42    }
43}
44
45impl From<TableScan> for PyTableScan {
46    fn from(table_scan: TableScan) -> PyTableScan {
47        PyTableScan { table_scan }
48    }
49}
50
51impl Display for PyTableScan {
52    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
53        write!(
54            f,
55            "TableScan\nTable Name: {}
56            Projections: {:?}
57            Projected Schema: {:?}
58            Filters: {:?}",
59            &self.table_scan.table_name,
60            &self.py_projections(),
61            &self.py_schema(),
62            &self.py_filters(),
63        )
64    }
65}
66
67#[pymethods]
68impl PyTableScan {
69    /// Retrieves the name of the table represented by this `TableScan` instance
70    #[pyo3(name = "table_name")]
71    fn py_table_name(&self) -> PyResult<String> {
72        Ok(format!("{}", self.table_scan.table_name))
73    }
74
75    #[pyo3(name = "fqn")]
76    fn fqn(&self) -> PyResult<(Option<String>, Option<String>, String)> {
77        let table_ref: TableReference = self.table_scan.table_name.clone();
78        Ok(match table_ref {
79            TableReference::Bare { table } => (None, None, table.to_string()),
80            TableReference::Partial { schema, table } => {
81                (None, Some(schema.to_string()), table.to_string())
82            }
83            TableReference::Full {
84                catalog,
85                schema,
86                table,
87            } => (
88                Some(catalog.to_string()),
89                Some(schema.to_string()),
90                table.to_string(),
91            ),
92        })
93    }
94
95    /// The column indexes that should be. Note if this is empty then
96    /// all columns should be read by the `TableProvider`. This function
97    /// provides a Tuple of the (index, column_name) to make things simpler
98    /// for the calling code since often times the name is preferred to
99    /// the index which is a lower level abstraction.
100    #[pyo3(name = "projection")]
101    fn py_projections(&self) -> PyResult<Vec<(usize, String)>> {
102        match &self.table_scan.projection {
103            Some(indices) => {
104                let schema = self.table_scan.source.schema();
105                Ok(indices
106                    .iter()
107                    .map(|i| (*i, schema.field(*i).name().to_string()))
108                    .collect())
109            }
110            None => Ok(vec![]),
111        }
112    }
113
114    /// Resulting schema from the `TableScan` operation
115    #[pyo3(name = "schema")]
116    fn py_schema(&self) -> PyResult<PyDFSchema> {
117        Ok((*self.table_scan.projected_schema).clone().into())
118    }
119
120    /// Certain `TableProvider` physical readers offer the capability to filter rows that
121    /// are read at read time. These `filters` are contained here.
122    #[pyo3(name = "filters")]
123    fn py_filters(&self) -> PyResult<Vec<PyExpr>> {
124        Ok(self
125            .table_scan
126            .filters
127            .iter()
128            .map(|expr| PyExpr::from(expr.clone()))
129            .collect())
130    }
131
132    /// Optional number of rows that should be read at read time by the `TableProvider`
133    #[pyo3(name = "fetch")]
134    fn py_fetch(&self) -> PyResult<Option<usize>> {
135        Ok(self.table_scan.fetch)
136    }
137
138    fn __repr__(&self) -> PyResult<String> {
139        Ok(format!("TableScan({})", self))
140    }
141}
142
143impl LogicalNode for PyTableScan {
144    fn inputs(&self) -> Vec<PyLogicalPlan> {
145        // table scans are leaf nodes and do not have inputs
146        vec![]
147    }
148
149    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
150        self.clone().into_bound_py_any(py)
151    }
152}