datafusion_python/expr/
table_scan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display, Formatter};
19
20use datafusion::common::TableReference;
21use datafusion::logical_expr::logical_plan::TableScan;
22use pyo3::prelude::*;
23use pyo3::IntoPyObjectExt;
24
25use crate::common::df_schema::PyDFSchema;
26use crate::expr::logical_node::LogicalNode;
27use crate::expr::PyExpr;
28use crate::sql::logical::PyLogicalPlan;
29
30#[pyclass(frozen, name = "TableScan", module = "datafusion.expr", subclass)]
31#[derive(Clone)]
32pub struct PyTableScan {
33    table_scan: TableScan,
34}
35
36impl PyTableScan {
37    pub fn new(table_scan: TableScan) -> Self {
38        Self { table_scan }
39    }
40}
41
42impl From<PyTableScan> for TableScan {
43    fn from(tbl_scan: PyTableScan) -> TableScan {
44        tbl_scan.table_scan
45    }
46}
47
48impl From<TableScan> for PyTableScan {
49    fn from(table_scan: TableScan) -> PyTableScan {
50        PyTableScan { table_scan }
51    }
52}
53
54impl Display for PyTableScan {
55    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
56        write!(
57            f,
58            "TableScan\nTable Name: {}
59            Projections: {:?}
60            Projected Schema: {:?}
61            Filters: {:?}",
62            &self.table_scan.table_name,
63            &self.py_projections(),
64            &self.py_schema(),
65            &self.py_filters(),
66        )
67    }
68}
69
70#[pymethods]
71impl PyTableScan {
72    /// Retrieves the name of the table represented by this `TableScan` instance
73    #[pyo3(name = "table_name")]
74    fn py_table_name(&self) -> PyResult<String> {
75        Ok(format!("{}", self.table_scan.table_name))
76    }
77
78    #[pyo3(name = "fqn")]
79    fn fqn(&self) -> PyResult<(Option<String>, Option<String>, String)> {
80        let table_ref: TableReference = self.table_scan.table_name.clone();
81        Ok(match table_ref {
82            TableReference::Bare { table } => (None, None, table.to_string()),
83            TableReference::Partial { schema, table } => {
84                (None, Some(schema.to_string()), table.to_string())
85            }
86            TableReference::Full {
87                catalog,
88                schema,
89                table,
90            } => (
91                Some(catalog.to_string()),
92                Some(schema.to_string()),
93                table.to_string(),
94            ),
95        })
96    }
97
98    /// The column indexes that should be. Note if this is empty then
99    /// all columns should be read by the `TableProvider`. This function
100    /// provides a Tuple of the (index, column_name) to make things simpler
101    /// for the calling code since often times the name is preferred to
102    /// the index which is a lower level abstraction.
103    #[pyo3(name = "projection")]
104    fn py_projections(&self) -> PyResult<Vec<(usize, String)>> {
105        match &self.table_scan.projection {
106            Some(indices) => {
107                let schema = self.table_scan.source.schema();
108                Ok(indices
109                    .iter()
110                    .map(|i| (*i, schema.field(*i).name().to_string()))
111                    .collect())
112            }
113            None => Ok(vec![]),
114        }
115    }
116
117    /// Resulting schema from the `TableScan` operation
118    #[pyo3(name = "schema")]
119    fn py_schema(&self) -> PyResult<PyDFSchema> {
120        Ok((*self.table_scan.projected_schema).clone().into())
121    }
122
123    /// Certain `TableProvider` physical readers offer the capability to filter rows that
124    /// are read at read time. These `filters` are contained here.
125    #[pyo3(name = "filters")]
126    fn py_filters(&self) -> PyResult<Vec<PyExpr>> {
127        Ok(self
128            .table_scan
129            .filters
130            .iter()
131            .map(|expr| PyExpr::from(expr.clone()))
132            .collect())
133    }
134
135    /// Optional number of rows that should be read at read time by the `TableProvider`
136    #[pyo3(name = "fetch")]
137    fn py_fetch(&self) -> PyResult<Option<usize>> {
138        Ok(self.table_scan.fetch)
139    }
140
141    fn __repr__(&self) -> PyResult<String> {
142        Ok(format!("TableScan({self})"))
143    }
144}
145
146impl LogicalNode for PyTableScan {
147    fn inputs(&self) -> Vec<PyLogicalPlan> {
148        // table scans are leaf nodes and do not have inputs
149        vec![]
150    }
151
152    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
153        self.clone().into_bound_py_any(py)
154    }
155}