skade-katalog 0.1.8

The katalog under skade: an embedded, single-file ACID Apache Iceberg catalog (redb) with time-travel snapshots and atomic multi-table release commits — the Norns recording the world's icebergs.
Documentation
// Apache-2.0 licensed.

//! Multi-table atomic commits.
//!
//! Iceberg's per-table `Transaction::commit` only guarantees atomicity for
//! one table. For warehouses where several tables represent one logical unit
//! (e.g. `bench_runs` + `dep_graph` + `components` published together as one
//! release), partial visibility of half-committed data is undesirable.
//!
//! redb gives every write transaction global atomicity across all tables in
//! the database. This module exposes that as
//! [`RedbCatalog::atomic_release`]: prepare N `TableCommit`s, write their
//! new metadata files to object storage, then flip every catalog pointer in
//! a single redb transaction. Either *all* table heads advance or *none* do.

use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, Result, TableCommit};
use redb::ReadableTable;

use crate::catalog::RedbCatalog;
use crate::error::map_redb;
use crate::keys::table_key;
use crate::store::TABLES;

impl RedbCatalog {
    /// Atomically commit a batch of [`TableCommit`]s.
    ///
    /// Behaviour:
    ///
    /// 1. For each commit: load the current table, apply requirements +
    ///    updates, and stage the new `TableMetadata`. If any apply fails the
    ///    whole batch fails before any I/O is performed.
    /// 2. Each staged metadata JSON is written to the table's storage via
    ///    `FileIO`. **These writes are not transactional** — on failure here,
    ///    earlier staged files are left as orphan blobs (Iceberg orphan-file
    ///    cleanup, if you run it, will collect them).
    /// 3. A single redb write transaction reads every current
    ///    metadata-pointer, compares it against the base used at stage time,
    ///    and on full agreement flips them all to the staged locations. If
    ///    any one pointer has moved since stage (concurrent writer), the
    ///    entire transaction aborts with `CatalogCommitConflicts` and no
    ///    pointers move.
    ///
    /// Returns the new `Table` handles in the same order as the input.
    pub async fn atomic_release(
        &self,
        commits: impl IntoIterator<Item = TableCommit>,
    ) -> Result<Vec<Table>> {
        // Stage phase: load tables, apply commits in memory, capture old/new
        // metadata locations.
        struct Staged {
            ident: iceberg::TableIdent,
            base_metadata_location: String,
            staged_metadata_location: String,
            staged_table: Table,
        }

        let mut staged: Vec<Staged> = Vec::new();
        for commit in commits {
            let ident = commit.identifier().clone();
            let current = self.load_table(&ident).await?;
            let base = current.metadata_location_result()?.to_string();
            let table = commit.apply(current)?;
            let new_loc = table.metadata_location_result()?.to_string();
            staged.push(Staged {
                ident,
                base_metadata_location: base,
                staged_metadata_location: new_loc,
                staged_table: table,
            });
        }

        // Write phase: persist staged metadata blobs. Done outside the redb
        // transaction so we don't hold the write lock during network I/O.
        for s in &staged {
            s.staged_table
                .metadata()
                .write_to(s.staged_table.file_io(), &s.staged_metadata_location)
                .await?;
        }

        // Commit phase: swap all pointers atomically with optimistic checks.
        {
            let db = self.store.db.lock().await;
            let mut write = db.begin_write().map_err(map_redb)?;
            write.set_durability(self.store.durability);
            {
                let mut tables_tbl = write.open_table(TABLES).map_err(map_redb)?;
                for s in &staged {
                    let key = table_key(&self.name, &s.ident);
                    let current = tables_tbl
                        .get(key.as_str())
                        .map_err(map_redb)?
                        .map(|v| v.value().to_string());
                    match current {
                        Some(loc) if loc == s.base_metadata_location => {}
                        Some(_) => {
                            return Err(Error::new(
                                ErrorKind::CatalogCommitConflicts,
                                format!("Commit conflicted for table {} in atomic batch", s.ident),
                            )
                            .with_retryable(true));
                        }
                        None => {
                            return Err(Error::new(
                                ErrorKind::TableNotFound,
                                format!("Table {} disappeared during atomic batch", s.ident),
                            ));
                        }
                    }
                }
                for s in &staged {
                    let key = table_key(&self.name, &s.ident);
                    tables_tbl
                        .insert(key.as_str(), s.staged_metadata_location.as_str())
                        .map_err(map_redb)?;
                }
            }
            // Append every advanced table to the immutable commit log (same txn).
            for s in &staged {
                crate::store::record_commit(
                    &write,
                    &table_key(&self.name, &s.ident),
                    s.staged_table.metadata().current_snapshot_id(),
                    &s.staged_metadata_location,
                )
                .map_err(map_redb)?;
            }
            write.commit().map_err(map_redb)?;
            // L1 write-through: publish all advanced pointers (under the redb
            // write lock, so the batch is visible to readers all-or-nothing
            // relative to other writers).
            for s in &staged {
                self.store.pointers.insert(
                    &table_key(&self.name, &s.ident),
                    &s.staged_metadata_location,
                );
            }
        }
        self.store.maybe_trigger_compaction();

        Ok(staged.into_iter().map(|s| s.staged_table).collect())
    }
}