datafusion_python/
catalog.rs1use std::collections::HashSet;
19use std::sync::Arc;
20
21use pyo3::exceptions::PyKeyError;
22use pyo3::prelude::*;
23
24use crate::errors::{PyDataFusionError, PyDataFusionResult};
25use crate::utils::wait_for_future;
26use datafusion::{
27 arrow::pyarrow::ToPyArrow,
28 catalog::{CatalogProvider, SchemaProvider},
29 datasource::{TableProvider, TableType},
30};
31
32#[pyclass(name = "Catalog", module = "datafusion", subclass)]
33pub struct PyCatalog {
34 pub catalog: Arc<dyn CatalogProvider>,
35}
36
37#[pyclass(name = "Database", module = "datafusion", subclass)]
38pub struct PyDatabase {
39 pub database: Arc<dyn SchemaProvider>,
40}
41
42#[pyclass(name = "Table", module = "datafusion", subclass)]
43pub struct PyTable {
44 pub table: Arc<dyn TableProvider>,
45}
46
47impl PyCatalog {
48 pub fn new(catalog: Arc<dyn CatalogProvider>) -> Self {
49 Self { catalog }
50 }
51}
52
53impl PyDatabase {
54 pub fn new(database: Arc<dyn SchemaProvider>) -> Self {
55 Self { database }
56 }
57}
58
59impl PyTable {
60 pub fn new(table: Arc<dyn TableProvider>) -> Self {
61 Self { table }
62 }
63
64 pub fn table(&self) -> Arc<dyn TableProvider> {
65 self.table.clone()
66 }
67}
68
69#[pymethods]
70impl PyCatalog {
71 fn names(&self) -> Vec<String> {
72 self.catalog.schema_names()
73 }
74
75 #[pyo3(signature = (name="public"))]
76 fn database(&self, name: &str) -> PyResult<PyDatabase> {
77 match self.catalog.schema(name) {
78 Some(database) => Ok(PyDatabase::new(database)),
79 None => Err(PyKeyError::new_err(format!(
80 "Database with name {name} doesn't exist."
81 ))),
82 }
83 }
84
85 fn __repr__(&self) -> PyResult<String> {
86 Ok(format!(
87 "Catalog(schema_names=[{}])",
88 self.names().join(";")
89 ))
90 }
91}
92
93#[pymethods]
94impl PyDatabase {
95 fn names(&self) -> HashSet<String> {
96 self.database.table_names().into_iter().collect()
97 }
98
99 fn table(&self, name: &str, py: Python) -> PyDataFusionResult<PyTable> {
100 if let Some(table) = wait_for_future(py, self.database.table(name))? {
101 Ok(PyTable::new(table))
102 } else {
103 Err(PyDataFusionError::Common(format!(
104 "Table not found: {name}"
105 )))
106 }
107 }
108
109 fn __repr__(&self) -> PyResult<String> {
110 Ok(format!(
111 "Database(table_names=[{}])",
112 Vec::from_iter(self.names()).join(";")
113 ))
114 }
115
116 }
119
120#[pymethods]
121impl PyTable {
122 #[getter]
124 fn schema(&self, py: Python) -> PyResult<PyObject> {
125 self.table.schema().to_pyarrow(py)
126 }
127
128 #[getter]
130 fn kind(&self) -> &str {
131 match self.table.table_type() {
132 TableType::Base => "physical",
133 TableType::View => "view",
134 TableType::Temporary => "temporary",
135 }
136 }
137
138 fn __repr__(&self) -> PyResult<String> {
139 let kind = self.kind();
140 Ok(format!("Table(kind={kind})"))
141 }
142
143 }