datafusion_python/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use pyo3::exceptions::PyKeyError;
22use pyo3::prelude::*;
23
24use crate::errors::{PyDataFusionError, PyDataFusionResult};
25use crate::utils::wait_for_future;
26use datafusion::{
27    arrow::pyarrow::ToPyArrow,
28    catalog::{CatalogProvider, SchemaProvider},
29    datasource::{TableProvider, TableType},
30};
31
32#[pyclass(name = "Catalog", module = "datafusion", subclass)]
33pub struct PyCatalog {
34    pub catalog: Arc<dyn CatalogProvider>,
35}
36
37#[pyclass(name = "Database", module = "datafusion", subclass)]
38pub struct PyDatabase {
39    pub database: Arc<dyn SchemaProvider>,
40}
41
42#[pyclass(name = "Table", module = "datafusion", subclass)]
43pub struct PyTable {
44    pub table: Arc<dyn TableProvider>,
45}
46
47impl PyCatalog {
48    pub fn new(catalog: Arc<dyn CatalogProvider>) -> Self {
49        Self { catalog }
50    }
51}
52
53impl PyDatabase {
54    pub fn new(database: Arc<dyn SchemaProvider>) -> Self {
55        Self { database }
56    }
57}
58
59impl PyTable {
60    pub fn new(table: Arc<dyn TableProvider>) -> Self {
61        Self { table }
62    }
63
64    pub fn table(&self) -> Arc<dyn TableProvider> {
65        self.table.clone()
66    }
67}
68
69#[pymethods]
70impl PyCatalog {
71    fn names(&self) -> Vec<String> {
72        self.catalog.schema_names()
73    }
74
75    #[pyo3(signature = (name="public"))]
76    fn database(&self, name: &str) -> PyResult<PyDatabase> {
77        match self.catalog.schema(name) {
78            Some(database) => Ok(PyDatabase::new(database)),
79            None => Err(PyKeyError::new_err(format!(
80                "Database with name {name} doesn't exist."
81            ))),
82        }
83    }
84
85    fn __repr__(&self) -> PyResult<String> {
86        Ok(format!(
87            "Catalog(schema_names=[{}])",
88            self.names().join(";")
89        ))
90    }
91}
92
93#[pymethods]
94impl PyDatabase {
95    fn names(&self) -> HashSet<String> {
96        self.database.table_names().into_iter().collect()
97    }
98
99    fn table(&self, name: &str, py: Python) -> PyDataFusionResult<PyTable> {
100        if let Some(table) = wait_for_future(py, self.database.table(name))? {
101            Ok(PyTable::new(table))
102        } else {
103            Err(PyDataFusionError::Common(format!(
104                "Table not found: {name}"
105            )))
106        }
107    }
108
109    fn __repr__(&self) -> PyResult<String> {
110        Ok(format!(
111            "Database(table_names=[{}])",
112            Vec::from_iter(self.names()).join(";")
113        ))
114    }
115
116    // register_table
117    // deregister_table
118}
119
120#[pymethods]
121impl PyTable {
122    /// Get a reference to the schema for this table
123    #[getter]
124    fn schema(&self, py: Python) -> PyResult<PyObject> {
125        self.table.schema().to_pyarrow(py)
126    }
127
128    /// Get the type of this table for metadata/catalog purposes.
129    #[getter]
130    fn kind(&self) -> &str {
131        match self.table.table_type() {
132            TableType::Base => "physical",
133            TableType::View => "view",
134            TableType::Temporary => "temporary",
135        }
136    }
137
138    fn __repr__(&self) -> PyResult<String> {
139        let kind = self.kind();
140        Ok(format!("Table(kind={kind})"))
141    }
142
143    // fn scan
144    // fn statistics
145    // fn has_exact_statistics
146    // fn supports_filter_pushdown
147}