nornir 0.4.23

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Content-addressed blob writes for the artifact `*_blobs` tables.
//!
//! The artifact-snapshot machinery historizes index/dwarf/docs bytes as
//! `(snapshot_id, filename, bytes, byte_len, sha256)` rows. Two snapshots of an
//! index that barely changed — or two files within one snapshot that happen to
//! be byte-identical — would otherwise store the *same bytes* more than once.
//!
//! This module dedups at write time: before appending a batch of blob rows, drop
//! any whose `sha256` is already stored in the table. The payload is then written
//! at most once per distinct content hash; later references are satisfied by the
//! existing row (the snapshot row's `blob_count` / manifest still records *which*
//! blobs a snapshot logically contains, so dedup is transparent to readers that
//! fetch by content hash).
//!
//! Pure helper logic — the hash-set diff is unit-tested in isolation; the live
//! iceberg read/write wrappers are exercised by an ignored round-trip test.

use std::collections::HashSet;

use anyhow::Result;
use arrow::array::{Array, Int32Array, LargeBinaryArray, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::Catalog;

use super::iceberg::IcebergWarehouse;

/// One blob to (maybe) store: a logical filename, its bytes, and the content
/// hash that decides dedup identity.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlobRow {
    pub filename: String,
    pub bytes: Vec<u8>,
    pub sha256: String,
}

impl BlobRow {
    /// Build a row, computing the sha256 of `bytes`.
    pub fn new(filename: impl Into<String>, bytes: Vec<u8>) -> Self {
        let sha256 = sha256_hex(&bytes);
        Self { filename: filename.into(), bytes, sha256 }
    }
}

/// Lowercase hex sha256 of `bytes`.
pub fn sha256_hex(bytes: &[u8]) -> String {
    use sha2::{Digest, Sha256};
    let mut h = Sha256::new();
    h.update(bytes);
    let d = h.finalize();
    let mut s = String::with_capacity(d.len() * 2);
    for b in d {
        s.push_str(&format!("{b:02x}"));
    }
    s
}

/// Keep only the blobs whose content hash is **not** already stored.
///
/// Dedups against `existing` *and* against earlier rows in the same input, so a
/// single batch carrying the same bytes under two filenames writes those bytes
/// once. Returns `(to_write, skipped)` where `skipped` counts the rows elided.
pub fn dedup_against(
    rows: impl IntoIterator<Item = BlobRow>,
    existing: &HashSet<String>,
) -> (Vec<BlobRow>, usize) {
    let mut seen = existing.clone();
    let mut to_write = Vec::new();
    let mut skipped = 0usize;
    for r in rows {
        if seen.insert(r.sha256.clone()) {
            to_write.push(r);
        } else {
            skipped += 1;
        }
    }
    (to_write, skipped)
}

/// The set of `sha256` content hashes already stored in `blob_table`.
pub async fn existing_blob_hashes_async(
    wh: &IcebergWarehouse,
    blob_table: &str,
) -> Result<HashSet<String>> {
    let table = wh.catalog().load_table(&wh.table_ident(blob_table)).await?;
    let batches: Vec<RecordBatch> = table.scan().build()?.to_arrow().await?.try_collect().await?;
    let mut out = HashSet::new();
    for b in &batches {
        let sha = b
            .column_by_name("sha256")
            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
            .ok_or_else(|| anyhow::anyhow!("blob table missing `sha256` column"))?;
        for i in 0..b.num_rows() {
            out.insert(sha.value(i).to_string());
        }
    }
    Ok(out)
}

/// Append `blobs` to `blob_table` under `snapshot_id`, **content-deduped**: bytes
/// already present in the table (by sha256) are skipped — the same payload is
/// never written twice. Returns the number of rows actually written.
///
/// Matches the `artifact_blobs` shape: `(snapshot_id, filename, bytes, byte_len,
/// sha256)`.
pub async fn put_blobs_dedup_async(
    wh: &IcebergWarehouse,
    blob_table: &str,
    snapshot_id: &str,
    blobs: Vec<BlobRow>,
) -> Result<usize> {
    let existing = existing_blob_hashes_async(wh, blob_table).await?;
    let (to_write, _skipped) = dedup_against(blobs, &existing);
    if to_write.is_empty() {
        return Ok(0);
    }
    let table = wh.catalog().load_table(&wh.table_ident(blob_table)).await?;
    let schema =
        std::sync::Arc::new(iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?);
    let n = to_write.len();
    let ids = vec![snapshot_id.to_string(); n];
    let names: Vec<String> = to_write.iter().map(|b| b.filename.clone()).collect();
    let payloads: Vec<&[u8]> = to_write.iter().map(|b| b.bytes.as_slice()).collect();
    let lens: Vec<i32> = to_write.iter().map(|b| b.bytes.len() as i32).collect();
    let hashes: Vec<String> = to_write.iter().map(|b| b.sha256.clone()).collect();
    let cols: Vec<std::sync::Arc<dyn Array>> = vec![
        std::sync::Arc::new(StringArray::from(ids)),
        std::sync::Arc::new(StringArray::from(names)),
        std::sync::Arc::new(LargeBinaryArray::from(payloads)),
        std::sync::Arc::new(Int32Array::from(lens)),
        std::sync::Arc::new(StringArray::from(hashes)),
    ];
    let batch = RecordBatch::try_new(schema, cols)?;
    super::iceberg::append_batch(wh.catalog(), table, batch).await?;
    Ok(n)
}

/// Sync wrapper over [`put_blobs_dedup_async`].
pub fn put_blobs_dedup(
    wh: &IcebergWarehouse,
    blob_table: &str,
    snapshot_id: &str,
    blobs: Vec<BlobRow>,
) -> Result<usize> {
    wh.block_on(put_blobs_dedup_async(wh, blob_table, snapshot_id, blobs))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn sha256_is_stable_and_distinguishes_content() {
        let a = sha256_hex(b"hello");
        assert_eq!(a, sha256_hex(b"hello"), "same bytes → same hash");
        assert_ne!(a, sha256_hex(b"world"), "different bytes → different hash");
        assert_eq!(a.len(), 64, "hex sha256 is 64 chars");
    }

    #[test]
    fn dedup_skips_bytes_already_stored() {
        let existing: HashSet<String> = [sha256_hex(b"AAA")].into_iter().collect();
        let rows = vec![
            BlobRow::new("a.bin", b"AAA".to_vec()), // already stored → skip
            BlobRow::new("b.bin", b"BBB".to_vec()), // new → write
        ];
        let (to_write, skipped) = dedup_against(rows, &existing);
        assert_eq!(skipped, 1);
        assert_eq!(to_write.len(), 1);
        assert_eq!(to_write[0].filename, "b.bin");
    }

    #[test]
    fn dedup_collapses_identical_bytes_within_one_batch() {
        // Two filenames, identical content — the bytes must be written ONCE.
        let rows = vec![
            BlobRow::new("first.idx", b"PAYLOAD".to_vec()),
            BlobRow::new("second.idx", b"PAYLOAD".to_vec()),
            BlobRow::new("third.idx", b"OTHER".to_vec()),
        ];
        let (to_write, skipped) = dedup_against(rows, &HashSet::new());
        assert_eq!(skipped, 1, "the duplicate PAYLOAD row is elided");
        assert_eq!(to_write.len(), 2);
        // The surviving PAYLOAD row is the first occurrence.
        assert_eq!(to_write[0].filename, "first.idx");
        let payload_rows = to_write.iter().filter(|r| r.bytes == b"PAYLOAD").count();
        assert_eq!(payload_rows, 1, "identical bytes stored exactly once");
    }

    #[test]
    fn dedup_noop_when_all_new() {
        let rows = vec![BlobRow::new("x", b"1".to_vec()), BlobRow::new("y", b"2".to_vec())];
        let (to_write, skipped) = dedup_against(rows, &HashSet::new());
        assert_eq!(skipped, 0);
        assert_eq!(to_write.len(), 2);
    }
}