Skip to main content

rustberg/catalog/
catalog_ext.rs

1//! Extended catalog trait for Rustberg-specific operations.
2//!
3//! The iceberg-rust `Catalog` trait's `update_table` method requires a `TableCommit`
4//! whose builder is `pub(crate)`. This module provides an extension trait that allows
5//! us to update tables directly from HTTP request payloads.
6
7use std::fmt::Debug;
8
9use async_trait::async_trait;
10use iceberg::table::Table;
11use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent, TableRequirement, TableUpdate};
12
13/// Extension trait for Iceberg catalogs that provides direct table update capabilities.
14///
15/// This trait is needed because the Iceberg REST API receives `TableUpdate` and
16/// `TableRequirement` directly in the commit request, but the iceberg-rust crate's
17/// `TableCommit` builder is `pub(crate)` and cannot be constructed externally.
18#[async_trait]
19pub trait CatalogExt: Catalog {
20    /// Updates a table's metadata directly with the given updates and requirements.
21    ///
22    /// This method:
23    /// 1. Loads the current table
24    /// 2. Validates all requirements against current metadata
25    /// 3. Applies all updates to build new metadata
26    /// 4. Persists the updated table
27    ///
28    /// Returns a 409 Conflict error if any requirement fails.
29    async fn commit_table(
30        &self,
31        table_ident: &TableIdent,
32        requirements: Vec<TableRequirement>,
33        updates: Vec<TableUpdate>,
34    ) -> Result<Table>;
35
36    /// Updates the metadata location for an existing table in the catalog registry.
37    ///
38    /// This is used internally by `commit_table` to atomically update the catalog's
39    /// pointer to the new metadata file after writing it to storage.
40    ///
41    /// Unlike `register_table`, this method expects the table to already exist.
42    async fn update_table_metadata_location(
43        &self,
44        table_ident: &TableIdent,
45        new_metadata_location: String,
46    ) -> Result<Table>;
47
48    /// Atomically commits changes to multiple tables in a single transaction.
49    ///
50    /// This is the atomic multi-table commit implementation that ensures
51    /// all changes succeed or all fail together. It uses optimistic concurrency control
52    /// with retry logic to handle concurrent modifications.
53    ///
54    /// # Arguments
55    /// * `table_changes` - A list of (table_ident, requirements, updates) tuples
56    ///
57    /// # Returns
58    /// * `Ok(Vec<Table>)` - The updated tables after successful commit
59    /// * `Err` - If any requirement fails or if the commit cannot be applied atomically
60    ///
61    /// # Atomicity Guarantee
62    /// Either all table changes are applied atomically, or none are. If a conflict
63    /// is detected (another transaction modified any of the tables), this method
64    /// will retry with exponential backoff up to a configured number of retries.
65    async fn commit_tables_atomic(
66        &self,
67        table_changes: Vec<(TableIdent, Vec<TableRequirement>, Vec<TableUpdate>)>,
68    ) -> Result<Vec<Table>>;
69
70    /// Performs a storage backend health check.
71    ///
72    /// This method validates connectivity to the underlying storage backend (S3/GCS/Azure/local).
73    /// Used by the `/ready` endpoint to ensure the catalog can read and write table metadata.
74    ///
75    /// # Returns
76    /// * `Ok(StorageHealthStatus)` - Storage health details including backend type and latency
77    /// * `Err` - If the storage backend is unreachable or misconfigured
78    async fn storage_health_check(&self) -> Result<StorageHealthStatus>;
79}
80
81/// Storage backend health status.
82#[derive(Debug, Clone)]
83pub struct StorageHealthStatus {
84    /// Backend type (e.g., "s3", "gcs", "azure", "file", "memory")
85    pub backend_type: String,
86    /// Whether the backend is healthy
87    pub healthy: bool,
88    /// Health check latency in milliseconds
89    pub latency_ms: u64,
90    /// Optional status message
91    pub message: Option<String>,
92}
93
94impl StorageHealthStatus {
95    /// Creates a healthy status.
96    pub fn healthy(backend_type: impl Into<String>, latency_ms: u64) -> Self {
97        Self {
98            backend_type: backend_type.into(),
99            healthy: true,
100            latency_ms,
101            message: None,
102        }
103    }
104
105    /// Creates an unhealthy status with a message.
106    pub fn unhealthy(backend_type: impl Into<String>, message: impl Into<String>) -> Self {
107        Self {
108            backend_type: backend_type.into(),
109            healthy: false,
110            latency_ms: 0,
111            message: Some(message.into()),
112        }
113    }
114}
115
116/// Wrapper around any `Catalog` that implements `CatalogExt`.
117#[derive(Debug)]
118pub struct ExtendedCatalog<C: Catalog> {
119    inner: C,
120}
121
122impl<C: Catalog> ExtendedCatalog<C> {
123    /// Creates a new extended catalog wrapping the given catalog.
124    pub fn new(inner: C) -> Self {
125        Self { inner }
126    }
127}
128
129#[async_trait]
130impl<C: Catalog + Send + Sync> Catalog for ExtendedCatalog<C> {
131    async fn list_namespaces(
132        &self,
133        parent: Option<&iceberg::NamespaceIdent>,
134    ) -> Result<Vec<iceberg::NamespaceIdent>> {
135        self.inner.list_namespaces(parent).await
136    }
137
138    async fn create_namespace(
139        &self,
140        namespace: &iceberg::NamespaceIdent,
141        properties: std::collections::HashMap<String, String>,
142    ) -> Result<iceberg::Namespace> {
143        self.inner.create_namespace(namespace, properties).await
144    }
145
146    async fn get_namespace(
147        &self,
148        namespace: &iceberg::NamespaceIdent,
149    ) -> Result<iceberg::Namespace> {
150        self.inner.get_namespace(namespace).await
151    }
152
153    async fn namespace_exists(&self, namespace: &iceberg::NamespaceIdent) -> Result<bool> {
154        self.inner.namespace_exists(namespace).await
155    }
156
157    async fn update_namespace(
158        &self,
159        namespace: &iceberg::NamespaceIdent,
160        properties: std::collections::HashMap<String, String>,
161    ) -> Result<()> {
162        self.inner.update_namespace(namespace, properties).await
163    }
164
165    async fn drop_namespace(&self, namespace: &iceberg::NamespaceIdent) -> Result<()> {
166        self.inner.drop_namespace(namespace).await
167    }
168
169    async fn list_tables(&self, namespace: &iceberg::NamespaceIdent) -> Result<Vec<TableIdent>> {
170        self.inner.list_tables(namespace).await
171    }
172
173    async fn create_table(
174        &self,
175        namespace: &iceberg::NamespaceIdent,
176        creation: iceberg::TableCreation,
177    ) -> Result<Table> {
178        self.inner.create_table(namespace, creation).await
179    }
180
181    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
182        self.inner.load_table(table).await
183    }
184
185    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
186        self.inner.drop_table(table).await
187    }
188
189    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
190        self.inner.table_exists(table).await
191    }
192
193    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
194        self.inner.rename_table(src, dest).await
195    }
196
197    async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table> {
198        self.inner.register_table(table, metadata_location).await
199    }
200
201    async fn update_table(&self, commit: iceberg::TableCommit) -> Result<Table> {
202        self.inner.update_table(commit).await
203    }
204}
205
206#[async_trait]
207impl<C: Catalog + Send + Sync> CatalogExt for ExtendedCatalog<C> {
208    async fn commit_table(
209        &self,
210        table_ident: &TableIdent,
211        requirements: Vec<TableRequirement>,
212        updates: Vec<TableUpdate>,
213    ) -> Result<Table> {
214        // Load current table
215        let table = self.inner.load_table(table_ident).await?;
216
217        // Check all requirements against current metadata
218        for requirement in &requirements {
219            requirement.check(Some(table.metadata()))?;
220        }
221
222        // Get current metadata location
223        let current_metadata_location = table
224            .metadata_location()
225            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Table has no metadata location"))?;
226
227        // Apply all updates to build new metadata
228        let mut metadata_builder = table
229            .metadata()
230            .clone()
231            .into_builder(Some(current_metadata_location.to_string()));
232
233        for update in updates {
234            metadata_builder = update.apply(metadata_builder)?;
235        }
236
237        // Build the new metadata
238        let new_metadata = metadata_builder.build()?;
239
240        // Generate new metadata location
241        let new_metadata_location = generate_new_metadata_location(current_metadata_location)?;
242
243        // Write the new metadata file to storage
244        // This is critical for persistence - without this, snapshots are lost!
245        new_metadata
246            .metadata
247            .write_to(table.file_io(), &new_metadata_location)
248            .await?;
249
250        // Update the catalog registry with the new metadata location
251        self.update_table_metadata_location(table_ident, new_metadata_location)
252            .await
253    }
254
255    async fn update_table_metadata_location(
256        &self,
257        table_ident: &TableIdent,
258        new_metadata_location: String,
259    ) -> Result<Table> {
260        // HIGH-004: Drop-and-register pattern for MemoryCatalog compatibility.
261        //
262        // ⚠️ WARNING: NOT PRODUCTION SAFE!
263        //
264        // This implementation uses drop-then-register because MemoryCatalog's
265        // register_table returns an error if the table already exists, and there's
266        // no update_table_location method in the Catalog trait.
267        //
268        // RISKS:
269        // - If server crashes between drop and register, the table entry is lost
270        // - The metadata file still exists in storage, allowing manual recovery
271        // - For production, use SlateCatalog which has atomic update_table support
272        //
273        // The order MUST be drop-first because:
274        // 1. MemoryCatalog.register_table fails if table exists
275        // 2. We cannot use a temp name because metadata file path must match
276        //
277        // MITIGATION:
278        // - Log extensively for crash recovery forensics
279        // - SlateCatalog is recommended for all production deployments
280
281        tracing::debug!(
282            table = %table_ident,
283            new_location = %new_metadata_location,
284            "Updating table metadata location using drop-and-register pattern"
285        );
286
287        // Drop the existing table entry
288        self.inner.drop_table(table_ident).await?;
289
290        // Register with the new metadata location
291        // If this fails, the table entry is lost but metadata file remains in storage
292        match self
293            .inner
294            .register_table(table_ident, new_metadata_location.clone())
295            .await
296        {
297            Ok(table) => Ok(table),
298            Err(e) => {
299                tracing::error!(
300                    table = %table_ident,
301                    metadata_location = %new_metadata_location,
302                    error = %e,
303                    "CRITICAL: Table dropped but re-registration failed. \
304                     Table entry is lost. Metadata file still exists at the \
305                     specified location and can be manually recovered."
306                );
307                Err(e)
308            }
309        }
310    }
311
312    async fn commit_tables_atomic(
313        &self,
314        table_changes: Vec<(TableIdent, Vec<TableRequirement>, Vec<TableUpdate>)>,
315    ) -> Result<Vec<Table>> {
316        // Single-table commits are trivially atomic — delegate normally
317        if table_changes.len() <= 1 {
318            let mut results = Vec::with_capacity(table_changes.len());
319            for (ident, reqs, updates) in table_changes {
320                let table = self.commit_table(&ident, reqs, updates).await?;
321                results.push(table);
322            }
323            return Ok(results);
324        }
325
326        // Multi-table atomic commits are NOT supported by the MemoryCatalog backend.
327        // Rather than silently degrading to non-atomic sequential commits (which
328        // could leave partial state on failure), we reject the request explicitly.
329        // Use SlateCatalog with slatedb-storage feature for true atomic multi-table commits.
330        Err(Error::new(
331            ErrorKind::FeatureUnsupported,
332            format!(
333                "Atomic multi-table commit ({} tables) not supported by this catalog backend. \
334                 Enable slatedb-storage feature and use a persistent storage backend for true atomic commits.",
335                table_changes.len()
336            ),
337        ))
338    }
339
340    async fn storage_health_check(&self) -> Result<StorageHealthStatus> {
341        // ExtendedCatalog wraps MemoryCatalog which has no external storage.
342        // We report it as healthy since there's nothing to check.
343        Ok(StorageHealthStatus::healthy("memory", 0))
344    }
345}
346
347/// Generates a new metadata location by incrementing the version number.
348fn generate_new_metadata_location(current_location: &str) -> Result<String> {
349    use std::path::Path;
350
351    let path = Path::new(current_location);
352    let parent = path.parent().map(|p| p.to_string_lossy().to_string());
353    let filename = path
354        .file_name()
355        .and_then(|f| f.to_str())
356        .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Invalid metadata location"))?;
357
358    // Parse version from filename (e.g., "00001-uuid.metadata.json")
359    let version = filename
360        .split('-')
361        .next()
362        .and_then(|v| v.parse::<u32>().ok())
363        .unwrap_or(0);
364
365    let new_version = version + 1;
366    let new_uuid = uuid::Uuid::new_v4();
367    let new_filename = format!("{:05}-{}.metadata.json", new_version, new_uuid);
368
369    match parent {
370        Some(p) if !p.is_empty() => Ok(format!("{}/{}", p, new_filename)),
371        _ => Ok(new_filename),
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    #[test]
380    fn test_generate_new_metadata_location() {
381        let current = "s3://bucket/warehouse/db/table/metadata/00001-abc.metadata.json";
382        let new_loc = generate_new_metadata_location(current).unwrap();
383
384        assert!(new_loc.starts_with("s3://bucket/warehouse/db/table/metadata/00002-"));
385        assert!(new_loc.ends_with(".metadata.json"));
386    }
387
388    #[test]
389    fn test_generate_new_metadata_location_local() {
390        let current = "/tmp/warehouse/metadata/00005-xyz.metadata.json";
391        let new_loc = generate_new_metadata_location(current).unwrap();
392
393        assert!(new_loc.starts_with("/tmp/warehouse/metadata/00006-"));
394        assert!(new_loc.ends_with(".metadata.json"));
395    }
396}