Skip to main content

supertable_core/
catalog.rs

1//! # SuperTable Catalog
2//!
3//! This module defines the catalog abstraction for SuperTable. The catalog
4//! is responsible for:
5//!
6//! - **Namespace management**: Organizing tables into hierarchical namespaces
7//! - **Table metadata tracking**: Storing the location of current metadata files
8//! - **Atomic commits**: Ensuring ACID guarantees through compare-and-swap
9//!
10//! ## Catalog Implementations
11//!
12//! SuperTable supports multiple catalog backends:
13//!
14//! - **In-Memory**: For testing and development
15//! - **SQLite**: For single-node deployments
16//! - **PostgreSQL**: For production multi-node deployments
17//! - **REST**: For integration with existing Iceberg REST catalogs
18//!
19//! ## Concurrency
20//!
21//! All catalog implementations must support atomic compare-and-swap operations
22//! to enable optimistic concurrency control. This is typically achieved through:
23//!
24//! - Database transactions with row-level locking
25//! - Conditional writes (e.g., etags, version numbers)
26//! - Distributed consensus (e.g., Raft, Paxos)
27
28use async_trait::async_trait;
29use std::collections::HashMap;
30use std::sync::Arc;
31use thiserror::Error;
32use tokio::sync::RwLock;
33
34use crate::metadata::TableMetadata;
35
36
37/// Errors that can occur during catalog operations.
38#[derive(Debug, Error)]
39pub enum CatalogError {
40    /// The requested table was not found.
41    #[error("table not found: {0}")]
42    TableNotFound(String),
43
44    /// The requested namespace was not found.
45    #[error("namespace not found: {0}")]
46    NamespaceNotFound(String),
47
48    /// The table already exists.
49    #[error("table already exists: {0}")]
50    TableAlreadyExists(String),
51
52    /// The namespace already exists.
53    #[error("namespace already exists: {0}")]
54    NamespaceAlreadyExists(String),
55
56    /// A conflict occurred during an atomic operation.
57    #[error("commit conflict: expected version {expected}, found {actual}")]
58    CommitConflict { expected: i64, actual: i64 },
59
60    /// An I/O error occurred.
61    #[error("io error: {0}")]
62    Io(#[from] std::io::Error),
63
64    /// A serialization error occurred.
65    #[error("serialization error: {0}")]
66    Serialization(String),
67}
68
69/// Result type for catalog operations.
70pub type CatalogResult<T> = Result<T, CatalogError>;
71
72/// A table identifier consisting of a namespace and table name.
73#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct TableIdentifier {
75    /// The namespace (can be multi-level, e.g., ["db", "schema"]).
76    pub namespace: Vec<String>,
77
78    /// The table name.
79    pub name: String,
80}
81
82impl TableIdentifier {
83    /// Creates a new table identifier.
84    pub fn new(
85        namespace: impl IntoIterator<Item = impl Into<String>>,
86        name: impl Into<String>,
87    ) -> Self {
88        Self {
89            namespace: namespace.into_iter().map(|s| s.into()).collect(),
90            name: name.into(),
91        }
92    }
93
94    /// Creates a table identifier from a single namespace level.
95    pub fn of(namespace: impl Into<String>, name: impl Into<String>) -> Self {
96        Self {
97            namespace: vec![namespace.into()],
98            name: name.into(),
99        }
100    }
101
102    /// Returns the fully qualified name (namespace.table).
103    pub fn full_name(&self) -> String {
104        if self.namespace.is_empty() {
105            self.name.clone()
106        } else {
107            format!("{}.{}", self.namespace.join("."), self.name)
108        }
109    }
110
111    /// Parses a fully qualified name into a TableIdentifier.
112    pub fn parse(full_name: &str) -> Self {
113        let parts: Vec<&str> = full_name.split('.').collect();
114        if parts.len() == 1 {
115            Self {
116                namespace: Vec::new(),
117                name: parts[0].to_string(),
118            }
119        } else {
120            let (namespace, name) = parts.split_at(parts.len() - 1);
121            Self {
122                namespace: namespace.iter().map(|s| s.to_string()).collect(),
123                name: name[0].to_string(),
124            }
125        }
126    }
127}
128
129impl std::fmt::Display for TableIdentifier {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        write!(f, "{}", self.full_name())
132    }
133}
134
135/// Properties for a namespace.
136pub type NamespaceProperties = HashMap<String, String>;
137
138/// The main catalog trait for managing tables.
139#[async_trait]
140pub trait Catalog: Send + Sync {
141    /// Returns the name of this catalog.
142    fn name(&self) -> &str;
143
144    /// Lists all namespaces, optionally under a parent namespace.
145    async fn list_namespaces(&self, parent: Option<&[String]>) -> CatalogResult<Vec<Vec<String>>>;
146
147    /// Creates a new namespace.
148    async fn create_namespace(
149        &self,
150        namespace: &[String],
151        properties: NamespaceProperties,
152    ) -> CatalogResult<()>;
153
154    /// Drops a namespace (must be empty).
155    async fn drop_namespace(&self, namespace: &[String]) -> CatalogResult<()>;
156
157    /// Gets namespace properties.
158    async fn namespace_properties(
159        &self,
160        namespace: &[String],
161    ) -> CatalogResult<NamespaceProperties>;
162
163    /// Lists all tables in a namespace.
164    async fn list_tables(&self, namespace: &[String]) -> CatalogResult<Vec<TableIdentifier>>;
165
166    /// Creates a new table.
167    async fn create_table(
168        &self,
169        identifier: &TableIdentifier,
170        metadata: TableMetadata,
171    ) -> CatalogResult<TableMetadata>;
172
173    /// Loads a table's metadata.
174    async fn load_table(&self, identifier: &TableIdentifier) -> CatalogResult<TableMetadata>;
175
176    /// Drops a table.
177    async fn drop_table(&self, identifier: &TableIdentifier, purge: bool) -> CatalogResult<()>;
178
179    /// Renames a table.
180    async fn rename_table(&self, from: &TableIdentifier, to: &TableIdentifier)
181    -> CatalogResult<()>;
182
183    /// Checks if a table exists.
184    async fn table_exists(&self, identifier: &TableIdentifier) -> CatalogResult<bool>;
185
186    /// Atomically updates table metadata using compare-and-swap.
187    ///
188    /// # Arguments
189    ///
190    /// * `identifier` - The table to update
191    /// * `base_version` - The expected current version (sequence number)
192    /// * `metadata` - The new metadata to commit
193    ///
194    /// # Returns
195    ///
196    /// The committed metadata on success.
197    async fn commit_table(
198        &self,
199        identifier: &TableIdentifier,
200        base_version: i64,
201        metadata: TableMetadata,
202    ) -> CatalogResult<TableMetadata>;
203}
204
205/// An in-memory catalog implementation for testing and development.
206///
207/// This implementation stores all metadata in memory and is not persistent.
208/// It's useful for unit tests and local development.
209#[derive(Debug)]
210pub struct InMemoryCatalog {
211    name: String,
212    namespaces: RwLock<HashMap<Vec<String>, NamespaceProperties>>,
213    tables: RwLock<HashMap<TableIdentifier, TableMetadata>>,
214}
215
216impl InMemoryCatalog {
217    /// Creates a new in-memory catalog.
218    pub fn new(name: impl Into<String>) -> Self {
219        Self {
220            name: name.into(),
221            namespaces: RwLock::new(HashMap::new()),
222            tables: RwLock::new(HashMap::new()),
223        }
224    }
225
226    /// Creates a new catalog wrapped in an Arc for sharing.
227    pub fn shared(name: impl Into<String>) -> Arc<Self> {
228        Arc::new(Self::new(name))
229    }
230}
231
232#[async_trait]
233impl Catalog for InMemoryCatalog {
234    fn name(&self) -> &str {
235        &self.name
236    }
237
238    async fn list_namespaces(&self, parent: Option<&[String]>) -> CatalogResult<Vec<Vec<String>>> {
239        let namespaces: tokio::sync::RwLockReadGuard<HashMap<Vec<String>, crate::catalog::NamespaceProperties>> = self.namespaces.read().await;
240        let result: Vec<Vec<String>> = namespaces
241            .keys()
242            .filter(|ns| match parent {
243                Some(p) => ns.starts_with(p) && ns.len() == p.len() + 1,
244                None => ns.len() == 1,
245            })
246            .cloned()
247            .collect();
248        Ok(result)
249    }
250
251    async fn create_namespace(
252        &self,
253        namespace: &[String],
254        properties: NamespaceProperties,
255    ) -> CatalogResult<()> {
256        let mut namespaces: tokio::sync::RwLockWriteGuard<HashMap<Vec<String>, crate::catalog::NamespaceProperties>> = self.namespaces.write().await;
257        let ns_vec = namespace.to_vec();
258
259        if namespaces.contains_key(&ns_vec) {
260            return Err(CatalogError::NamespaceAlreadyExists(namespace.join(".")));
261        }
262
263        namespaces.insert(ns_vec, properties);
264        Ok(())
265    }
266
267    async fn drop_namespace(&self, namespace: &[String]) -> CatalogResult<()> {
268        let mut namespaces = self.namespaces.write().await;
269        let ns_vec = namespace.to_vec();
270
271        if namespaces.remove(&ns_vec).is_none() {
272            return Err(CatalogError::NamespaceNotFound(namespace.join(".")));
273        }
274
275        Ok(())
276    }
277
278    async fn namespace_properties(
279        &self,
280        namespace: &[String],
281    ) -> CatalogResult<NamespaceProperties> {
282        let namespaces = self.namespaces.read().await;
283        namespaces
284            .get(namespace)
285            .cloned()
286            .ok_or_else(|| CatalogError::NamespaceNotFound(namespace.join(".")))
287    }
288
289    async fn list_tables(&self, namespace: &[String]) -> CatalogResult<Vec<TableIdentifier>> {
290        let tables = self.tables.read().await;
291        let result: Vec<TableIdentifier> = tables
292            .keys()
293            .filter(|id| id.namespace == namespace)
294            .cloned()
295            .collect();
296        Ok(result)
297    }
298
299    async fn create_table(
300        &self,
301        identifier: &TableIdentifier,
302        metadata: TableMetadata,
303    ) -> CatalogResult<TableMetadata> {
304        let mut tables = self.tables.write().await;
305
306        if tables.contains_key(identifier) {
307            return Err(CatalogError::TableAlreadyExists(identifier.full_name()));
308        }
309
310        tables.insert(identifier.clone(), metadata.clone());
311        Ok(metadata)
312    }
313
314    async fn load_table(&self, identifier: &TableIdentifier) -> CatalogResult<TableMetadata> {
315        let tables = self.tables.read().await;
316        tables
317            .get(identifier)
318            .cloned()
319            .ok_or_else(|| CatalogError::TableNotFound(identifier.full_name()))
320    }
321
322    async fn drop_table(&self, identifier: &TableIdentifier, _purge: bool) -> CatalogResult<()> {
323        let mut tables = self.tables.write().await;
324
325        if tables.remove(identifier).is_none() {
326            return Err(CatalogError::TableNotFound(identifier.full_name()));
327        }
328
329        Ok(())
330    }
331
332    async fn rename_table(
333        &self,
334        from: &TableIdentifier,
335        to: &TableIdentifier,
336    ) -> CatalogResult<()> {
337        let mut tables = self.tables.write().await;
338
339        let metadata = tables
340            .remove(from)
341            .ok_or_else(|| CatalogError::TableNotFound(from.full_name()))?;
342
343        if tables.contains_key(to) {
344            // Restore the original table if target exists
345            tables.insert(from.clone(), metadata);
346            return Err(CatalogError::TableAlreadyExists(to.full_name()));
347        }
348
349        tables.insert(to.clone(), metadata);
350        Ok(())
351    }
352
353    async fn table_exists(&self, identifier: &TableIdentifier) -> CatalogResult<bool> {
354        let tables: tokio::sync::RwLockReadGuard<HashMap<TableIdentifier, TableMetadata>> = self.tables.read().await;
355        Ok(tables.contains_key(identifier))
356    }
357
358    async fn commit_table(
359        &self,
360        identifier: &TableIdentifier,
361        base_version: i64,
362        metadata: TableMetadata,
363    ) -> CatalogResult<TableMetadata> {
364        let mut tables: tokio::sync::RwLockWriteGuard<HashMap<TableIdentifier, TableMetadata>> = self.tables.write().await;
365
366        let current = tables
367            .get(identifier)
368            .ok_or_else(|| CatalogError::TableNotFound(identifier.full_name()))?;
369
370        // Check for conflicts using sequence number
371        if current.last_sequence_number != base_version {
372            return Err(CatalogError::CommitConflict {
373                expected: base_version,
374                actual: current.last_sequence_number,
375            });
376        }
377
378        tables.insert(identifier.clone(), metadata.clone());
379        Ok(metadata)
380    }
381}
382
383
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388    use crate::schema::{Schema, Type};
389
390    fn sample_metadata(location: &str) -> TableMetadata {
391        let schema = Schema::builder(0)
392            .with_field(1, "id", Type::Long, true)
393            .build();
394        TableMetadata::builder(location, schema).build()
395    }
396
397    #[tokio::test]
398    async fn test_create_namespace() {
399        let catalog = InMemoryCatalog::new("test");
400
401        catalog
402            .create_namespace(&["db".into()], HashMap::new())
403            .await
404            .unwrap();
405
406        let namespaces = catalog.list_namespaces(None).await.unwrap();
407        assert_eq!(namespaces.len(), 1);
408        assert_eq!(namespaces[0], vec!["db".to_string()]);
409    }
410
411    #[tokio::test]
412    async fn test_create_table() {
413        let catalog = InMemoryCatalog::new("test");
414
415        let identifier = TableIdentifier::of("db", "users");
416        let metadata = sample_metadata("s3://bucket/users");
417
418        catalog.create_table(&identifier, metadata).await.unwrap();
419
420        let exists = catalog.table_exists(&identifier).await.unwrap();
421        assert!(exists);
422    }
423
424    #[tokio::test]
425    async fn test_commit_conflict() {
426        let catalog = InMemoryCatalog::new("test");
427
428        let identifier = TableIdentifier::of("db", "users");
429        let metadata = sample_metadata("s3://bucket/users");
430
431        catalog
432            .create_table(&identifier, metadata.clone())
433            .await
434            .unwrap();
435
436        // Try to commit with wrong base version
437        let result = catalog
438            .commit_table(&identifier, 999, metadata.clone())
439            .await;
440
441        assert!(matches!(result, Err(CatalogError::CommitConflict { .. })));
442    }
443
444    #[tokio::test]
445    async fn test_table_identifier_parsing() {
446        let id = TableIdentifier::parse("db.schema.users");
447        assert_eq!(id.namespace, vec!["db", "schema"]);
448        assert_eq!(id.name, "users");
449        assert_eq!(id.full_name(), "db.schema.users");
450
451        let id2 = TableIdentifier::parse("simple_table");
452        assert!(id2.namespace.is_empty());
453        assert_eq!(id2.name, "simple_table");
454    }
455}