1use std::collections::{BTreeMap, HashMap};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{OnceLock, RwLock};
6
7use crate::{Error, Result};
8
9#[derive(Debug, Clone)]
12pub struct CachedTableInfo {
13 pub storage_location: Option<String>,
15 pub format: String,
17}
18
19static TABLE_INFO_CACHE_EPOCH: AtomicU64 = AtomicU64::new(0);
21
22#[derive(Clone)]
24struct CacheEntry {
25 info: CachedTableInfo,
26 epoch: u64,
27}
28
29fn table_info_cache() -> &'static RwLock<HashMap<String, CacheEntry>> {
30 static CACHE: OnceLock<RwLock<HashMap<String, CacheEntry>>> = OnceLock::new();
31 CACHE.get_or_init(|| RwLock::new(HashMap::new()))
32}
33
34fn invalidate_cache() {
35 TABLE_INFO_CACHE_EPOCH.fetch_add(1, Ordering::Relaxed);
36}
37
38#[derive(Clone, Debug)]
40pub struct CatalogInfo {
41 pub name: String,
43 pub comment: Option<String>,
45 pub storage_root: Option<String>,
47}
48
49#[derive(Clone, Debug)]
51pub struct NamespaceInfo {
52 pub name: String,
54 pub catalog_name: String,
56 pub comment: Option<String>,
58 pub storage_root: Option<String>,
60}
61
62#[derive(Clone, Debug)]
64pub struct ColumnInfo {
65 pub name: String,
67 pub type_name: String,
69 pub position: usize,
71 pub nullable: bool,
73 pub comment: Option<String>,
75}
76
77#[derive(Clone, Debug)]
79pub struct TableInfo {
80 pub name: String,
82 pub catalog_name: String,
84 pub namespace_name: String,
86 pub storage_location: Option<String>,
88 pub data_source_format: Option<String>,
90 pub columns: Vec<ColumnInfo>,
92}
93
94#[derive(Debug, Default)]
95struct CatalogStore {
96 catalogs: BTreeMap<String, CatalogEntry>,
97}
98
99#[derive(Debug)]
100struct CatalogEntry {
101 info: CatalogInfo,
102 namespaces: BTreeMap<String, NamespaceEntry>,
103}
104
105#[derive(Debug)]
106struct NamespaceEntry {
107 info: NamespaceInfo,
108 tables: BTreeMap<String, TableInfo>,
109}
110
111fn catalog_store() -> &'static RwLock<CatalogStore> {
112 static STORE: OnceLock<RwLock<CatalogStore>> = OnceLock::new();
113 STORE.get_or_init(|| RwLock::new(CatalogStore::default()))
114}
115
116pub struct Catalog;
118
119impl Catalog {
120 pub fn list_catalogs() -> Result<Vec<CatalogInfo>> {
122 let guard = catalog_store()
123 .read()
124 .map_err(|_| Error::CatalogLockPoisoned)?;
125 Ok(guard
126 .catalogs
127 .values()
128 .map(|entry| entry.info.clone())
129 .collect())
130 }
131
132 pub fn list_namespaces(catalog_name: &str) -> Result<Vec<NamespaceInfo>> {
134 let guard = catalog_store()
135 .read()
136 .map_err(|_| Error::CatalogLockPoisoned)?;
137 let catalog = guard
138 .catalogs
139 .get(catalog_name)
140 .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
141 Ok(catalog
142 .namespaces
143 .values()
144 .map(|entry| entry.info.clone())
145 .collect())
146 }
147
148 pub fn list_tables(catalog_name: &str, namespace_name: &str) -> Result<Vec<TableInfo>> {
150 let guard = catalog_store()
151 .read()
152 .map_err(|_| Error::CatalogLockPoisoned)?;
153 let catalog = guard
154 .catalogs
155 .get(catalog_name)
156 .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
157 let namespace = catalog.namespaces.get(namespace_name).ok_or_else(|| {
158 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
159 })?;
160 Ok(namespace.tables.values().cloned().collect::<Vec<_>>())
161 }
162
163 pub fn get_table_info(
165 catalog_name: &str,
166 namespace_name: &str,
167 table_name: &str,
168 ) -> Result<TableInfo> {
169 let guard = catalog_store()
170 .read()
171 .map_err(|_| Error::CatalogLockPoisoned)?;
172 let catalog = guard
173 .catalogs
174 .get(catalog_name)
175 .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
176 let namespace = catalog.namespaces.get(namespace_name).ok_or_else(|| {
177 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
178 })?;
179 namespace.tables.get(table_name).cloned().ok_or_else(|| {
180 Error::TableNotFound(format!(
181 "{}.{}.{}",
182 catalog_name, namespace_name, table_name
183 ))
184 })
185 }
186
187 pub fn get_table_info_cached(
190 catalog_name: &str,
191 namespace_name: &str,
192 table_name: &str,
193 ) -> Result<CachedTableInfo> {
194 let cache_key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
195 let current_epoch = TABLE_INFO_CACHE_EPOCH.load(Ordering::Relaxed);
196
197 if let Ok(cache) = table_info_cache().read() {
199 if let Some(entry) = cache.get(&cache_key) {
200 if entry.epoch == current_epoch {
201 return Ok(entry.info.clone());
202 }
203 }
204 }
205
206 let info = Self::get_table_info(catalog_name, namespace_name, table_name)?;
208 let cached = CachedTableInfo {
209 storage_location: info.storage_location.clone(),
210 format: info
211 .data_source_format
212 .as_deref()
213 .unwrap_or("PARQUET")
214 .to_ascii_uppercase(),
215 };
216
217 if let Ok(mut cache) = table_info_cache().write() {
219 cache.insert(
220 cache_key,
221 CacheEntry {
222 info: cached.clone(),
223 epoch: current_epoch,
224 },
225 );
226 }
227
228 Ok(cached)
229 }
230
231 pub fn create_catalog(name: &str) -> Result<()> {
233 invalidate_cache();
234 let mut guard = catalog_store()
235 .write()
236 .map_err(|_| Error::CatalogLockPoisoned)?;
237 if guard.catalogs.contains_key(name) {
238 return Err(Error::CatalogAlreadyExists(name.to_string()));
239 }
240 guard.catalogs.insert(
241 name.to_string(),
242 CatalogEntry {
243 info: CatalogInfo {
244 name: name.to_string(),
245 comment: None,
246 storage_root: None,
247 },
248 namespaces: BTreeMap::new(),
249 },
250 );
251 Ok(())
252 }
253
254 pub fn delete_catalog(name: &str) -> Result<()> {
256 invalidate_cache();
257 let mut guard = catalog_store()
258 .write()
259 .map_err(|_| Error::CatalogLockPoisoned)?;
260 if guard.catalogs.remove(name).is_none() {
261 return Err(Error::CatalogNotFound(name.to_string()));
262 }
263 Ok(())
264 }
265
266 pub fn create_namespace(catalog_name: &str, namespace_name: &str) -> Result<()> {
268 invalidate_cache();
269 let mut guard = catalog_store()
270 .write()
271 .map_err(|_| Error::CatalogLockPoisoned)?;
272 let catalog = guard
273 .catalogs
274 .get_mut(catalog_name)
275 .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
276 if catalog.namespaces.contains_key(namespace_name) {
277 return Err(Error::NamespaceAlreadyExists(
278 catalog_name.to_string(),
279 namespace_name.to_string(),
280 ));
281 }
282 catalog.namespaces.insert(
283 namespace_name.to_string(),
284 NamespaceEntry {
285 info: NamespaceInfo {
286 name: namespace_name.to_string(),
287 catalog_name: catalog_name.to_string(),
288 comment: None,
289 storage_root: None,
290 },
291 tables: BTreeMap::new(),
292 },
293 );
294 Ok(())
295 }
296
297 pub fn delete_namespace(catalog_name: &str, namespace_name: &str) -> Result<()> {
299 invalidate_cache();
300 let mut guard = catalog_store()
301 .write()
302 .map_err(|_| Error::CatalogLockPoisoned)?;
303 let catalog = guard
304 .catalogs
305 .get_mut(catalog_name)
306 .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
307 if catalog.namespaces.remove(namespace_name).is_none() {
308 return Err(Error::NamespaceNotFound(
309 catalog_name.to_string(),
310 namespace_name.to_string(),
311 ));
312 }
313 Ok(())
314 }
315
316 pub fn create_table(
318 catalog_name: &str,
319 namespace_name: &str,
320 table_name: &str,
321 columns: Vec<ColumnInfo>,
322 storage_location: Option<String>,
323 data_source_format: Option<String>,
324 ) -> Result<()> {
325 invalidate_cache();
326 if let Some(format) = data_source_format.as_deref() {
327 if format != "parquet" {
328 return Err(Error::UnsupportedDataSourceFormat(format.to_string()));
329 }
330 }
331 let mut guard = catalog_store()
332 .write()
333 .map_err(|_| Error::CatalogLockPoisoned)?;
334 let catalog = guard
335 .catalogs
336 .get_mut(catalog_name)
337 .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
338 let namespace = catalog.namespaces.get_mut(namespace_name).ok_or_else(|| {
339 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
340 })?;
341 if namespace.tables.contains_key(table_name) {
342 return Err(Error::TableAlreadyExists(format!(
343 "{}.{}.{}",
344 catalog_name, namespace_name, table_name
345 )));
346 }
347 namespace.tables.insert(
348 table_name.to_string(),
349 TableInfo {
350 name: table_name.to_string(),
351 catalog_name: catalog_name.to_string(),
352 namespace_name: namespace_name.to_string(),
353 storage_location,
354 data_source_format,
355 columns,
356 },
357 );
358 Ok(())
359 }
360
361 pub fn delete_table(catalog_name: &str, namespace_name: &str, table_name: &str) -> Result<()> {
363 invalidate_cache();
364 let mut guard = catalog_store()
365 .write()
366 .map_err(|_| Error::CatalogLockPoisoned)?;
367 let catalog = guard
368 .catalogs
369 .get_mut(catalog_name)
370 .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
371 let namespace = catalog.namespaces.get_mut(namespace_name).ok_or_else(|| {
372 Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
373 })?;
374 if namespace.tables.remove(table_name).is_none() {
375 return Err(Error::TableNotFound(format!(
376 "{}.{}.{}",
377 catalog_name, namespace_name, table_name
378 )));
379 }
380 Ok(())
381 }
382}