znippy-iceberg 0.1.4

Apache Iceberg backend for znippy: IcebergSink (write) + IcebergZnippyReader (read) over a durable, embedded skade catalog.
//! `seal()` — materialise a dynamic, iceberg-backed znippy archive into a static,
//! immutable **native arrow-ipc `.znippy`** artifact.
//!
//! The iceberg metadata store ([`IcebergSink`]/[`IcebergZnippyReader`]) is a
//! *dynamic* store: its sub-index rows live in skade/Iceberg tables in a durable
//! `catalog.redb` warehouse that can be reopened, appended to (a new iceberg
//! commit), and queried after a process restart. `seal()` takes a snapshot of
//! that store and freezes it into the **v0.7 inline container** that
//! [`ArrowIpcSink`] writes: blobs followed by Arrow-IPC sub-indexes, the sorted
//! lookup sub-index, the fst trie, the manifest, and the `ZNPYMIDX` footer.
//!
//! This is holger's "static / drift" backing: the iceberg side is the live,
//! mutable store; the sealed `.znippy` is the immutable, content-addressed
//! artifact you ship / pin / archive. After sealing, the artifact opens with the
//! ordinary [`ZnippyArchive`]/`read_znippy_index` reader and every external
//! Arrow tool (DuckDB/Polars) can `SELECT` its sub-indexes directly — the
//! iceberg warehouse is no longer required to read it.
//!
//! ## Metadata-only, blobs reused (no recompress)
//! The compressed blob bytes are **never touched**. A znippy `.znippy` written
//! with the iceberg sink is *pure blobs* (`[blob_0]…[blob_N]`, the metadata went
//! to the warehouse, not the file). `seal()` copies those blob bytes verbatim
//! into the output `.znippy`, then appends only the metadata tail. The
//! `blob_offset`/`blob_size`/`fdata_offset` columns from the iceberg tables stay
//! valid because the blob region is copied byte-for-byte at the same offsets
//! (blobs start at offset 0 in both formats).
//!
//! [`ZnippyArchive`]: znippy_common::ZnippyArchive

use std::path::Path;
use std::sync::Arc;

use anyhow::{Context, Result, anyhow};

use znippy_common::arrow as arrow58;
use znippy_common::index::lookup_schema;
use znippy_common::{ArchiveMetaSink, ArrowIpcSink, GroupKey};

use arrow_array::RecordBatch as RecordBatch57;

use crate::scan_base_batches_57;

/// Outcome of a [`seal`] call — the cheap, metadata-only accounting that proves
/// no blob bytes were rewritten.
#[derive(Debug, Clone)]
pub struct SealReport {
    /// Distinct files (relative paths) carried over from the iceberg store.
    pub files: u64,
    /// Total chunk rows sealed (one per `(path, chunk_seq)`).
    pub rows: u64,
    /// Bytes of the blob region copied verbatim from the source sidecar.
    pub blob_bytes_copied: u64,
    /// Final size of the sealed `.znippy` (blobs + metadata tail + footer).
    pub sealed_total_bytes: u64,
}

/// Seal a dynamic, iceberg-backed archive into a static native `.znippy`.
///
/// * `sidecar` — the source `.znippy` blob sidecar written when the archive was
///   compressed with the iceberg metadata sink (pure blobs, no footer).
/// * `warehouse` — the durable skade warehouse holding the metadata tables.
/// * `namespace` — the archive's iceberg namespace (its file stem, what
///   [`IcebergSink::new`](crate::IcebergSink::new) was given).
/// * `out` — destination for the sealed native `.znippy`.
///
/// Reuses the iceberg reader's table scan for the metadata, copies the blob
/// region verbatim (no recompress), and drives [`ArrowIpcSink`] to write the
/// native v0.7 metadata tail + footer. Returns a [`SealReport`].
pub fn seal(sidecar: &Path, warehouse: &Path, namespace: &str, out: &Path) -> Result<SealReport> {
    // 1. Scan the dynamic iceberg store → base-schema Arrow-57 batches (one row
    //    per chunk; checksum kept as LargeBinary, the cast `seal` inverts below).
    let rt = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .context("seal: building tokio runtime")?;
    let batches57 = rt.block_on(scan_base_batches_57(warehouse, namespace))?;
    if batches57.is_empty() {
        return Err(anyhow!(
            "seal: no metadata tables found for namespace '{namespace}' in {}",
            warehouse.display()
        ));
    }

    // 2. Bridge Arrow-57 → Arrow-58 and lift checksum LargeBinary →
    //    FixedSizeBinary(32) so the rows match znippy's native base schema, the
    //    exact shape ArrowIpcSink consumes.
    let batches58 = bridge_to_native_base(&batches57)?;
    let rows: u64 = batches58.iter().map(|b| b.num_rows() as u64).sum();
    let files = distinct_paths(&batches58)?;

    // 3. Copy the blob region verbatim — blobs reused, no recompress. The
    //    iceberg-format sidecar is pure blobs, so its whole length is the blob
    //    region and the first sub-index goes immediately after it.
    let blob_bytes_copied = std::fs::copy(sidecar, out).with_context(|| {
        format!(
            "seal: copying blob sidecar {}{}",
            sidecar.display(),
            out.display()
        )
    })?;

    // 4. Drive the native sink: one data sub-index of all rows, then finish()
    //    writes the sorted lookup sub-index + fst trie + manifest + ZNPYMIDX
    //    footer — byte-identical to a fresh arrow-ipc compress.
    let out_file = Arc::new(
        std::fs::OpenOptions::new()
            .write(true)
            .open(out)
            .with_context(|| format!("seal: reopening output {} for write", out.display()))?,
    );
    let mut sink: Box<dyn ArchiveMetaSink> =
        Box::new(ArrowIpcSink::new(Arc::clone(&out_file), blob_bytes_copied));

    let schema = lookup_schema(); // the native base schema (Arrow 58).
    sink.push_subindex(
        schema.as_ref(),
        &batches58,
        GroupKey {
            // A sealed snapshot is one logical group; pkg_type 0 / no repo keeps
            // the data sub-index in the normal (non-reserved) read set.
            pkg_type: 0,
            repo: String::new(),
            module_name: String::new(),
        },
    )?;
    let sealed_total_bytes = sink.finish()?;

    Ok(SealReport {
        files,
        rows,
        blob_bytes_copied,
        sealed_total_bytes,
    })
}

/// Convert base-schema Arrow-57 batches (from the iceberg scan) to znippy's
/// native Arrow-58 base schema: re-encode via a stable IPC stream (Arrow 57 →
/// 58, the inverse of the sink's 58 → 57 bridge), then cast the checksum column
/// from `LargeBinary` back to `FixedSizeBinary(32)` so `accumulate_lookup`'s
/// downcast succeeds.
fn bridge_to_native_base(
    batches57: &[RecordBatch57],
) -> Result<Vec<arrow58::record_batch::RecordBatch>> {
    use arrow58::array::FixedSizeBinaryArray;
    use arrow58::datatypes::DataType as Dt58;

    // 57 → 58 via IPC (stream format is stable across the adjacent majors).
    let mut ipc: Vec<u8> = Vec::new();
    {
        let schema = batches57
            .first()
            .map(|b| b.schema())
            .ok_or_else(|| anyhow!("seal: empty batch set"))?;
        let mut w = arrow_ipc::writer::StreamWriter::try_new(&mut ipc, schema.as_ref())
            .map_err(|e| anyhow!("seal: ipc57 writer: {e}"))?;
        for b in batches57 {
            w.write(b).map_err(|e| anyhow!("seal: ipc57 write: {e}"))?;
        }
        w.finish().map_err(|e| anyhow!("seal: ipc57 finish: {e}"))?;
    }
    let reader =
        arrow58::ipc::reader::StreamReader::try_new(std::io::Cursor::new(&ipc[..]), None)
            .map_err(|e| anyhow!("seal: ipc58 reader: {e}"))?;

    let target = lookup_schema(); // checksum = FixedSizeBinary(32)
    let mut out = Vec::new();
    for batch in reader {
        let b = batch.map_err(|e| anyhow!("seal: ipc58 batch decode: {e}"))?;
        // Cast each column to the native base type. Only the checksum differs
        // (LargeBinary → FixedSizeBinary(32)); the rest are already exact.
        let mut cols: Vec<arrow58::array::ArrayRef> = Vec::with_capacity(target.fields().len());
        for field in target.fields() {
            let src = b
                .column_by_name(field.name())
                .ok_or_else(|| anyhow!("seal: sealed batch missing column '{}'", field.name()))?;
            let col = if src.data_type() == field.data_type() {
                // The base columns (Utf8/UInt32/UInt64/Boolean) round-trip
                // exactly through the 57→58 IPC bridge, so no cast is needed.
                src.clone()
            } else if matches!(field.data_type(), Dt58::FixedSizeBinary(32)) {
                // Only the checksum differs: the iceberg side stores it as
                // LargeBinary (Iceberg has no fixed-size-binary); rebuild it.
                fixed_from_binary(src.as_ref())?
            } else {
                return Err(anyhow!(
                    "seal: unexpected source type {:?} for base column '{}'",
                    src.data_type(),
                    field.name()
                ));
            };
            cols.push(col);
        }
        out.push(
            arrow58::record_batch::RecordBatch::try_new(target.clone(), cols)
                .map_err(|e| anyhow!("seal: rebuild native batch: {e}"))?,
        );
    }
    // sanity: every checksum lands as a 32-byte FixedSizeBinary
    for b in &out {
        let ck = b
            .column_by_name("checksum")
            .ok_or_else(|| anyhow!("seal: no checksum column"))?;
        ck.as_any()
            .downcast_ref::<FixedSizeBinaryArray>()
            .ok_or_else(|| anyhow!("seal: checksum not FixedSizeBinary(32) after bridge"))?;
    }
    Ok(out)
}

/// Build a `FixedSizeBinary(32)` array from a `LargeBinary`/`Binary` array whose
/// every value is exactly 32 bytes (the blake3 checksum).
fn fixed_from_binary(arr: &dyn arrow58::array::Array) -> Result<arrow58::array::ArrayRef> {
    use arrow58::array::{
        Array, BinaryArray, FixedSizeBinaryBuilder, LargeBinaryArray,
    };
    use arrow58::datatypes::DataType as Dt58;

    let mut builder = FixedSizeBinaryBuilder::with_capacity(arr.len(), 32);
    match arr.data_type() {
        Dt58::LargeBinary => {
            let a = arr
                .as_any()
                .downcast_ref::<LargeBinaryArray>()
                .ok_or_else(|| anyhow!("seal: checksum not LargeBinaryArray"))?;
            for i in 0..a.len() {
                builder
                    .append_value(a.value(i))
                    .map_err(|e| anyhow!("seal: checksum not 32 bytes at row {i}: {e}"))?;
            }
        }
        Dt58::Binary => {
            let a = arr
                .as_any()
                .downcast_ref::<BinaryArray>()
                .ok_or_else(|| anyhow!("seal: checksum not BinaryArray"))?;
            for i in 0..a.len() {
                builder
                    .append_value(a.value(i))
                    .map_err(|e| anyhow!("seal: checksum not 32 bytes at row {i}: {e}"))?;
            }
        }
        other => return Err(anyhow!("seal: unexpected checksum type {other:?}")),
    }
    Ok(Arc::new(builder.finish()))
}

/// Count distinct `relative_path` values across the native batches.
fn distinct_paths(batches: &[arrow58::record_batch::RecordBatch]) -> Result<u64> {
    use arrow58::array::{Array, StringArray};
    use std::collections::HashSet;
    let mut set: HashSet<String> = HashSet::new();
    for b in batches {
        let paths = b
            .column_by_name("relative_path")
            .ok_or_else(|| anyhow!("seal: missing relative_path"))?
            .as_any()
            .downcast_ref::<StringArray>()
            .ok_or_else(|| anyhow!("seal: relative_path not StringArray"))?;
        for i in 0..paths.len() {
            set.insert(paths.value(i).to_string());
        }
    }
    Ok(set.len() as u64)
}