Skip to main content

datafusion_python/expr/
table_scan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display, Formatter};
19
20use datafusion::common::TableReference;
21use datafusion::logical_expr::logical_plan::TableScan;
22use pyo3::IntoPyObjectExt;
23use pyo3::prelude::*;
24
25use crate::common::df_schema::PyDFSchema;
26use crate::expr::PyExpr;
27use crate::expr::logical_node::LogicalNode;
28use crate::sql::logical::PyLogicalPlan;
29
30#[pyclass(
31    from_py_object,
32    frozen,
33    name = "TableScan",
34    module = "datafusion.expr",
35    subclass
36)]
37#[derive(Clone)]
38pub struct PyTableScan {
39    table_scan: TableScan,
40}
41
42impl PyTableScan {
43    pub fn new(table_scan: TableScan) -> Self {
44        Self { table_scan }
45    }
46}
47
48impl From<PyTableScan> for TableScan {
49    fn from(tbl_scan: PyTableScan) -> TableScan {
50        tbl_scan.table_scan
51    }
52}
53
54impl From<TableScan> for PyTableScan {
55    fn from(table_scan: TableScan) -> PyTableScan {
56        PyTableScan { table_scan }
57    }
58}
59
60impl Display for PyTableScan {
61    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
62        write!(
63            f,
64            "TableScan\nTable Name: {}
65            Projections: {:?}
66            Projected Schema: {:?}
67            Filters: {:?}",
68            &self.table_scan.table_name,
69            &self.py_projections(),
70            &self.py_schema(),
71            &self.py_filters(),
72        )
73    }
74}
75
76#[pymethods]
77impl PyTableScan {
78    /// Retrieves the name of the table represented by this `TableScan` instance
79    #[pyo3(name = "table_name")]
80    fn py_table_name(&self) -> PyResult<String> {
81        Ok(format!("{}", self.table_scan.table_name))
82    }
83
84    #[pyo3(name = "fqn")]
85    fn fqn(&self) -> PyResult<(Option<String>, Option<String>, String)> {
86        let table_ref: TableReference = self.table_scan.table_name.clone();
87        Ok(match table_ref {
88            TableReference::Bare { table } => (None, None, table.to_string()),
89            TableReference::Partial { schema, table } => {
90                (None, Some(schema.to_string()), table.to_string())
91            }
92            TableReference::Full {
93                catalog,
94                schema,
95                table,
96            } => (
97                Some(catalog.to_string()),
98                Some(schema.to_string()),
99                table.to_string(),
100            ),
101        })
102    }
103
104    /// The column indexes that should be. Note if this is empty then
105    /// all columns should be read by the `TableProvider`. This function
106    /// provides a Tuple of the (index, column_name) to make things simpler
107    /// for the calling code since often times the name is preferred to
108    /// the index which is a lower level abstraction.
109    #[pyo3(name = "projection")]
110    fn py_projections(&self) -> PyResult<Vec<(usize, String)>> {
111        match &self.table_scan.projection {
112            Some(indices) => {
113                let schema = self.table_scan.source.schema();
114                Ok(indices
115                    .iter()
116                    .map(|i| (*i, schema.field(*i).name().to_string()))
117                    .collect())
118            }
119            None => Ok(vec![]),
120        }
121    }
122
123    /// Resulting schema from the `TableScan` operation
124    #[pyo3(name = "schema")]
125    fn py_schema(&self) -> PyResult<PyDFSchema> {
126        Ok((*self.table_scan.projected_schema).clone().into())
127    }
128
129    /// Certain `TableProvider` physical readers offer the capability to filter rows that
130    /// are read at read time. These `filters` are contained here.
131    #[pyo3(name = "filters")]
132    fn py_filters(&self) -> PyResult<Vec<PyExpr>> {
133        Ok(self
134            .table_scan
135            .filters
136            .iter()
137            .map(|expr| PyExpr::from(expr.clone()))
138            .collect())
139    }
140
141    /// Optional number of rows that should be read at read time by the `TableProvider`
142    #[pyo3(name = "fetch")]
143    fn py_fetch(&self) -> PyResult<Option<usize>> {
144        Ok(self.table_scan.fetch)
145    }
146
147    fn __repr__(&self) -> PyResult<String> {
148        Ok(format!("TableScan({self})"))
149    }
150}
151
152impl LogicalNode for PyTableScan {
153    fn inputs(&self) -> Vec<PyLogicalPlan> {
154        // table scans are leaf nodes and do not have inputs
155        vec![]
156    }
157
158    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
159        self.clone().into_bound_py_any(py)
160    }
161}