use std::collections::HashSet;
use anyhow::Result;
use arrow::array::{Array, Int32Array, LargeBinaryArray, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::Catalog;
use super::iceberg::IcebergWarehouse;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlobRow {
pub filename: String,
pub bytes: Vec<u8>,
pub sha256: String,
}
impl BlobRow {
pub fn new(filename: impl Into<String>, bytes: Vec<u8>) -> Self {
let sha256 = sha256_hex(&bytes);
Self { filename: filename.into(), bytes, sha256 }
}
}
pub fn sha256_hex(bytes: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(bytes);
let d = h.finalize();
let mut s = String::with_capacity(d.len() * 2);
for b in d {
s.push_str(&format!("{b:02x}"));
}
s
}
pub fn dedup_against(
rows: impl IntoIterator<Item = BlobRow>,
existing: &HashSet<String>,
) -> (Vec<BlobRow>, usize) {
let mut seen = existing.clone();
let mut to_write = Vec::new();
let mut skipped = 0usize;
for r in rows {
if seen.insert(r.sha256.clone()) {
to_write.push(r);
} else {
skipped += 1;
}
}
(to_write, skipped)
}
pub async fn existing_blob_hashes_async(
wh: &IcebergWarehouse,
blob_table: &str,
) -> Result<HashSet<String>> {
let table = wh.catalog().load_table(&wh.table_ident(blob_table)).await?;
let batches: Vec<RecordBatch> = table.scan().build()?.to_arrow().await?.try_collect().await?;
let mut out = HashSet::new();
for b in &batches {
let sha = b
.column_by_name("sha256")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.ok_or_else(|| anyhow::anyhow!("blob table missing `sha256` column"))?;
for i in 0..b.num_rows() {
out.insert(sha.value(i).to_string());
}
}
Ok(out)
}
pub async fn put_blobs_dedup_async(
wh: &IcebergWarehouse,
blob_table: &str,
snapshot_id: &str,
blobs: Vec<BlobRow>,
) -> Result<usize> {
let existing = existing_blob_hashes_async(wh, blob_table).await?;
let (to_write, _skipped) = dedup_against(blobs, &existing);
if to_write.is_empty() {
return Ok(0);
}
let table = wh.catalog().load_table(&wh.table_ident(blob_table)).await?;
let schema =
std::sync::Arc::new(iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?);
let n = to_write.len();
let ids = vec![snapshot_id.to_string(); n];
let names: Vec<String> = to_write.iter().map(|b| b.filename.clone()).collect();
let payloads: Vec<&[u8]> = to_write.iter().map(|b| b.bytes.as_slice()).collect();
let lens: Vec<i32> = to_write.iter().map(|b| b.bytes.len() as i32).collect();
let hashes: Vec<String> = to_write.iter().map(|b| b.sha256.clone()).collect();
let cols: Vec<std::sync::Arc<dyn Array>> = vec![
std::sync::Arc::new(StringArray::from(ids)),
std::sync::Arc::new(StringArray::from(names)),
std::sync::Arc::new(LargeBinaryArray::from(payloads)),
std::sync::Arc::new(Int32Array::from(lens)),
std::sync::Arc::new(StringArray::from(hashes)),
];
let batch = RecordBatch::try_new(schema, cols)?;
super::iceberg::append_batch(wh.catalog(), table, batch).await?;
Ok(n)
}
pub fn put_blobs_dedup(
wh: &IcebergWarehouse,
blob_table: &str,
snapshot_id: &str,
blobs: Vec<BlobRow>,
) -> Result<usize> {
wh.block_on(put_blobs_dedup_async(wh, blob_table, snapshot_id, blobs))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sha256_is_stable_and_distinguishes_content() {
let a = sha256_hex(b"hello");
assert_eq!(a, sha256_hex(b"hello"), "same bytes → same hash");
assert_ne!(a, sha256_hex(b"world"), "different bytes → different hash");
assert_eq!(a.len(), 64, "hex sha256 is 64 chars");
}
#[test]
fn dedup_skips_bytes_already_stored() {
let existing: HashSet<String> = [sha256_hex(b"AAA")].into_iter().collect();
let rows = vec![
BlobRow::new("a.bin", b"AAA".to_vec()), BlobRow::new("b.bin", b"BBB".to_vec()), ];
let (to_write, skipped) = dedup_against(rows, &existing);
assert_eq!(skipped, 1);
assert_eq!(to_write.len(), 1);
assert_eq!(to_write[0].filename, "b.bin");
}
#[test]
fn dedup_collapses_identical_bytes_within_one_batch() {
let rows = vec![
BlobRow::new("first.idx", b"PAYLOAD".to_vec()),
BlobRow::new("second.idx", b"PAYLOAD".to_vec()),
BlobRow::new("third.idx", b"OTHER".to_vec()),
];
let (to_write, skipped) = dedup_against(rows, &HashSet::new());
assert_eq!(skipped, 1, "the duplicate PAYLOAD row is elided");
assert_eq!(to_write.len(), 2);
assert_eq!(to_write[0].filename, "first.idx");
let payload_rows = to_write.iter().filter(|r| r.bytes == b"PAYLOAD").count();
assert_eq!(payload_rows, 1, "identical bytes stored exactly once");
}
#[test]
fn dedup_noop_when_all_new() {
let rows = vec![BlobRow::new("x", b"1".to_vec()), BlobRow::new("y", b"2".to_vec())];
let (to_write, skipped) = dedup_against(rows, &HashSet::new());
assert_eq!(skipped, 0);
assert_eq!(to_write.len(), 2);
}
}