Skip to main content

alopex_embedded/
catalog.rs

1//! Unity Catalog-like metadata store for embedded usage.
2
3use std::collections::{BTreeMap, HashMap};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{OnceLock, RwLock};
6
7use crate::{Error, Result};
8
9/// Cached table information for performance optimization.
10/// Contains only the fields needed for scan/write operations.
11#[derive(Debug, Clone)]
12pub struct CachedTableInfo {
13    /// Storage location of the table.
14    pub storage_location: Option<String>,
15    /// Data source format (e.g., "PARQUET").
16    pub format: String,
17}
18
19/// Global cache epoch for invalidation (incremented on DDL operations).
20static TABLE_INFO_CACHE_EPOCH: AtomicU64 = AtomicU64::new(0);
21
22/// Cache entry with epoch for staleness detection.
23#[derive(Clone)]
24struct CacheEntry {
25    info: CachedTableInfo,
26    epoch: u64,
27}
28
29fn table_info_cache() -> &'static RwLock<HashMap<String, CacheEntry>> {
30    static CACHE: OnceLock<RwLock<HashMap<String, CacheEntry>>> = OnceLock::new();
31    CACHE.get_or_init(|| RwLock::new(HashMap::new()))
32}
33
34fn invalidate_cache() {
35    TABLE_INFO_CACHE_EPOCH.fetch_add(1, Ordering::Relaxed);
36}
37
38/// Catalog metadata.
39#[derive(Clone, Debug)]
40pub struct CatalogInfo {
41    /// Catalog name.
42    pub name: String,
43    /// Optional catalog comment.
44    pub comment: Option<String>,
45    /// Optional catalog storage root.
46    pub storage_root: Option<String>,
47}
48
49/// Namespace metadata.
50#[derive(Clone, Debug)]
51pub struct NamespaceInfo {
52    /// Namespace name.
53    pub name: String,
54    /// Owning catalog name.
55    pub catalog_name: String,
56    /// Optional namespace comment.
57    pub comment: Option<String>,
58    /// Optional namespace storage root.
59    pub storage_root: Option<String>,
60}
61
62/// Column metadata.
63#[derive(Clone, Debug)]
64pub struct ColumnInfo {
65    /// Column name.
66    pub name: String,
67    /// Column type name.
68    pub type_name: String,
69    /// Column position in the schema.
70    pub position: usize,
71    /// Whether the column is nullable.
72    pub nullable: bool,
73    /// Optional column comment.
74    pub comment: Option<String>,
75}
76
77/// Table metadata.
78#[derive(Clone, Debug)]
79pub struct TableInfo {
80    /// Table name.
81    pub name: String,
82    /// Owning catalog name.
83    pub catalog_name: String,
84    /// Owning namespace name.
85    pub namespace_name: String,
86    /// Optional storage location.
87    pub storage_location: Option<String>,
88    /// Optional data source format.
89    pub data_source_format: Option<String>,
90    /// Column definitions.
91    pub columns: Vec<ColumnInfo>,
92}
93
94#[derive(Debug, Default)]
95struct CatalogStore {
96    catalogs: BTreeMap<String, CatalogEntry>,
97}
98
99#[derive(Debug)]
100struct CatalogEntry {
101    info: CatalogInfo,
102    namespaces: BTreeMap<String, NamespaceEntry>,
103}
104
105#[derive(Debug)]
106struct NamespaceEntry {
107    info: NamespaceInfo,
108    tables: BTreeMap<String, TableInfo>,
109}
110
111fn catalog_store() -> &'static RwLock<CatalogStore> {
112    static STORE: OnceLock<RwLock<CatalogStore>> = OnceLock::new();
113    STORE.get_or_init(|| RwLock::new(CatalogStore::default()))
114}
115
116/// Catalog metadata access entrypoint.
117pub struct Catalog;
118
119impl Catalog {
120    /// List all catalogs.
121    pub fn list_catalogs() -> Result<Vec<CatalogInfo>> {
122        let guard = catalog_store()
123            .read()
124            .map_err(|_| Error::CatalogLockPoisoned)?;
125        Ok(guard
126            .catalogs
127            .values()
128            .map(|entry| entry.info.clone())
129            .collect())
130    }
131
132    /// List all namespaces within a catalog.
133    pub fn list_namespaces(catalog_name: &str) -> Result<Vec<NamespaceInfo>> {
134        let guard = catalog_store()
135            .read()
136            .map_err(|_| Error::CatalogLockPoisoned)?;
137        let catalog = guard
138            .catalogs
139            .get(catalog_name)
140            .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
141        Ok(catalog
142            .namespaces
143            .values()
144            .map(|entry| entry.info.clone())
145            .collect())
146    }
147
148    /// List all tables within a namespace.
149    pub fn list_tables(catalog_name: &str, namespace_name: &str) -> Result<Vec<TableInfo>> {
150        let guard = catalog_store()
151            .read()
152            .map_err(|_| Error::CatalogLockPoisoned)?;
153        let catalog = guard
154            .catalogs
155            .get(catalog_name)
156            .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
157        let namespace = catalog.namespaces.get(namespace_name).ok_or_else(|| {
158            Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
159        })?;
160        Ok(namespace.tables.values().cloned().collect::<Vec<_>>())
161    }
162
163    /// Get a table metadata entry.
164    pub fn get_table_info(
165        catalog_name: &str,
166        namespace_name: &str,
167        table_name: &str,
168    ) -> Result<TableInfo> {
169        let guard = catalog_store()
170            .read()
171            .map_err(|_| Error::CatalogLockPoisoned)?;
172        let catalog = guard
173            .catalogs
174            .get(catalog_name)
175            .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
176        let namespace = catalog.namespaces.get(namespace_name).ok_or_else(|| {
177            Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
178        })?;
179        namespace.tables.get(table_name).cloned().ok_or_else(|| {
180            Error::TableNotFound(format!(
181                "{}.{}.{}",
182                catalog_name, namespace_name, table_name
183            ))
184        })
185    }
186
187    /// Get cached table info for scan/write operations.
188    /// Uses an internal cache to avoid repeated catalog lookups.
189    pub fn get_table_info_cached(
190        catalog_name: &str,
191        namespace_name: &str,
192        table_name: &str,
193    ) -> Result<CachedTableInfo> {
194        let cache_key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
195        let current_epoch = TABLE_INFO_CACHE_EPOCH.load(Ordering::Relaxed);
196
197        // Check cache first
198        if let Ok(cache) = table_info_cache().read() {
199            if let Some(entry) = cache.get(&cache_key) {
200                if entry.epoch == current_epoch {
201                    return Ok(entry.info.clone());
202                }
203            }
204        }
205
206        // Cache miss or stale: fetch from catalog and update cache
207        let info = Self::get_table_info(catalog_name, namespace_name, table_name)?;
208        let cached = CachedTableInfo {
209            storage_location: info.storage_location.clone(),
210            format: info
211                .data_source_format
212                .as_deref()
213                .unwrap_or("PARQUET")
214                .to_ascii_uppercase(),
215        };
216
217        // Update cache
218        if let Ok(mut cache) = table_info_cache().write() {
219            cache.insert(
220                cache_key,
221                CacheEntry {
222                    info: cached.clone(),
223                    epoch: current_epoch,
224                },
225            );
226        }
227
228        Ok(cached)
229    }
230
231    /// Create a catalog.
232    pub fn create_catalog(name: &str) -> Result<()> {
233        invalidate_cache();
234        let mut guard = catalog_store()
235            .write()
236            .map_err(|_| Error::CatalogLockPoisoned)?;
237        if guard.catalogs.contains_key(name) {
238            return Err(Error::CatalogAlreadyExists(name.to_string()));
239        }
240        guard.catalogs.insert(
241            name.to_string(),
242            CatalogEntry {
243                info: CatalogInfo {
244                    name: name.to_string(),
245                    comment: None,
246                    storage_root: None,
247                },
248                namespaces: BTreeMap::new(),
249            },
250        );
251        Ok(())
252    }
253
254    /// Delete a catalog.
255    pub fn delete_catalog(name: &str) -> Result<()> {
256        invalidate_cache();
257        let mut guard = catalog_store()
258            .write()
259            .map_err(|_| Error::CatalogLockPoisoned)?;
260        if guard.catalogs.remove(name).is_none() {
261            return Err(Error::CatalogNotFound(name.to_string()));
262        }
263        Ok(())
264    }
265
266    /// Create a namespace.
267    pub fn create_namespace(catalog_name: &str, namespace_name: &str) -> Result<()> {
268        invalidate_cache();
269        let mut guard = catalog_store()
270            .write()
271            .map_err(|_| Error::CatalogLockPoisoned)?;
272        let catalog = guard
273            .catalogs
274            .get_mut(catalog_name)
275            .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
276        if catalog.namespaces.contains_key(namespace_name) {
277            return Err(Error::NamespaceAlreadyExists(
278                catalog_name.to_string(),
279                namespace_name.to_string(),
280            ));
281        }
282        catalog.namespaces.insert(
283            namespace_name.to_string(),
284            NamespaceEntry {
285                info: NamespaceInfo {
286                    name: namespace_name.to_string(),
287                    catalog_name: catalog_name.to_string(),
288                    comment: None,
289                    storage_root: None,
290                },
291                tables: BTreeMap::new(),
292            },
293        );
294        Ok(())
295    }
296
297    /// Delete a namespace.
298    pub fn delete_namespace(catalog_name: &str, namespace_name: &str) -> Result<()> {
299        invalidate_cache();
300        let mut guard = catalog_store()
301            .write()
302            .map_err(|_| Error::CatalogLockPoisoned)?;
303        let catalog = guard
304            .catalogs
305            .get_mut(catalog_name)
306            .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
307        if catalog.namespaces.remove(namespace_name).is_none() {
308            return Err(Error::NamespaceNotFound(
309                catalog_name.to_string(),
310                namespace_name.to_string(),
311            ));
312        }
313        Ok(())
314    }
315
316    /// Create a table.
317    pub fn create_table(
318        catalog_name: &str,
319        namespace_name: &str,
320        table_name: &str,
321        columns: Vec<ColumnInfo>,
322        storage_location: Option<String>,
323        data_source_format: Option<String>,
324    ) -> Result<()> {
325        invalidate_cache();
326        if let Some(format) = data_source_format.as_deref() {
327            if format != "parquet" {
328                return Err(Error::UnsupportedDataSourceFormat(format.to_string()));
329            }
330        }
331        let mut guard = catalog_store()
332            .write()
333            .map_err(|_| Error::CatalogLockPoisoned)?;
334        let catalog = guard
335            .catalogs
336            .get_mut(catalog_name)
337            .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
338        let namespace = catalog.namespaces.get_mut(namespace_name).ok_or_else(|| {
339            Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
340        })?;
341        if namespace.tables.contains_key(table_name) {
342            return Err(Error::TableAlreadyExists(format!(
343                "{}.{}.{}",
344                catalog_name, namespace_name, table_name
345            )));
346        }
347        namespace.tables.insert(
348            table_name.to_string(),
349            TableInfo {
350                name: table_name.to_string(),
351                catalog_name: catalog_name.to_string(),
352                namespace_name: namespace_name.to_string(),
353                storage_location,
354                data_source_format,
355                columns,
356            },
357        );
358        Ok(())
359    }
360
361    /// Delete a table.
362    pub fn delete_table(catalog_name: &str, namespace_name: &str, table_name: &str) -> Result<()> {
363        invalidate_cache();
364        let mut guard = catalog_store()
365            .write()
366            .map_err(|_| Error::CatalogLockPoisoned)?;
367        let catalog = guard
368            .catalogs
369            .get_mut(catalog_name)
370            .ok_or_else(|| Error::CatalogNotFound(catalog_name.to_string()))?;
371        let namespace = catalog.namespaces.get_mut(namespace_name).ok_or_else(|| {
372            Error::NamespaceNotFound(catalog_name.to_string(), namespace_name.to_string())
373        })?;
374        if namespace.tables.remove(table_name).is_none() {
375            return Err(Error::TableNotFound(format!(
376                "{}.{}.{}",
377                catalog_name, namespace_name, table_name
378            )));
379        }
380        Ok(())
381    }
382}