use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result, anyhow};
use znippy_common::arrow as arrow58;
use znippy_common::index::lookup_schema;
use znippy_common::{ArchiveMetaSink, ArrowIpcSink, GroupKey};
use arrow_array::RecordBatch as RecordBatch57;
use crate::scan_base_batches_57;
#[derive(Debug, Clone)]
pub struct SealReport {
pub files: u64,
pub rows: u64,
pub blob_bytes_copied: u64,
pub sealed_total_bytes: u64,
}
pub fn seal(sidecar: &Path, warehouse: &Path, namespace: &str, out: &Path) -> Result<SealReport> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("seal: building tokio runtime")?;
let batches57 = rt.block_on(scan_base_batches_57(warehouse, namespace))?;
if batches57.is_empty() {
return Err(anyhow!(
"seal: no metadata tables found for namespace '{namespace}' in {}",
warehouse.display()
));
}
let batches58 = bridge_to_native_base(&batches57)?;
let rows: u64 = batches58.iter().map(|b| b.num_rows() as u64).sum();
let files = distinct_paths(&batches58)?;
let blob_bytes_copied = std::fs::copy(sidecar, out).with_context(|| {
format!(
"seal: copying blob sidecar {} → {}",
sidecar.display(),
out.display()
)
})?;
let out_file = Arc::new(
std::fs::OpenOptions::new()
.write(true)
.open(out)
.with_context(|| format!("seal: reopening output {} for write", out.display()))?,
);
let mut sink: Box<dyn ArchiveMetaSink> =
Box::new(ArrowIpcSink::new(Arc::clone(&out_file), blob_bytes_copied));
let schema = lookup_schema(); sink.push_subindex(
schema.as_ref(),
&batches58,
GroupKey {
pkg_type: 0,
repo: String::new(),
module_name: String::new(),
},
)?;
let sealed_total_bytes = sink.finish()?;
Ok(SealReport {
files,
rows,
blob_bytes_copied,
sealed_total_bytes,
})
}
fn bridge_to_native_base(
batches57: &[RecordBatch57],
) -> Result<Vec<arrow58::record_batch::RecordBatch>> {
use arrow58::array::FixedSizeBinaryArray;
use arrow58::datatypes::DataType as Dt58;
let mut ipc: Vec<u8> = Vec::new();
{
let schema = batches57
.first()
.map(|b| b.schema())
.ok_or_else(|| anyhow!("seal: empty batch set"))?;
let mut w = arrow_ipc::writer::StreamWriter::try_new(&mut ipc, schema.as_ref())
.map_err(|e| anyhow!("seal: ipc57 writer: {e}"))?;
for b in batches57 {
w.write(b).map_err(|e| anyhow!("seal: ipc57 write: {e}"))?;
}
w.finish().map_err(|e| anyhow!("seal: ipc57 finish: {e}"))?;
}
let reader =
arrow58::ipc::reader::StreamReader::try_new(std::io::Cursor::new(&ipc[..]), None)
.map_err(|e| anyhow!("seal: ipc58 reader: {e}"))?;
let target = lookup_schema(); let mut out = Vec::new();
for batch in reader {
let b = batch.map_err(|e| anyhow!("seal: ipc58 batch decode: {e}"))?;
let mut cols: Vec<arrow58::array::ArrayRef> = Vec::with_capacity(target.fields().len());
for field in target.fields() {
let src = b
.column_by_name(field.name())
.ok_or_else(|| anyhow!("seal: sealed batch missing column '{}'", field.name()))?;
let col = if src.data_type() == field.data_type() {
src.clone()
} else if matches!(field.data_type(), Dt58::FixedSizeBinary(32)) {
fixed_from_binary(src.as_ref())?
} else {
return Err(anyhow!(
"seal: unexpected source type {:?} for base column '{}'",
src.data_type(),
field.name()
));
};
cols.push(col);
}
out.push(
arrow58::record_batch::RecordBatch::try_new(target.clone(), cols)
.map_err(|e| anyhow!("seal: rebuild native batch: {e}"))?,
);
}
for b in &out {
let ck = b
.column_by_name("checksum")
.ok_or_else(|| anyhow!("seal: no checksum column"))?;
ck.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.ok_or_else(|| anyhow!("seal: checksum not FixedSizeBinary(32) after bridge"))?;
}
Ok(out)
}
fn fixed_from_binary(arr: &dyn arrow58::array::Array) -> Result<arrow58::array::ArrayRef> {
use arrow58::array::{
Array, BinaryArray, FixedSizeBinaryBuilder, LargeBinaryArray,
};
use arrow58::datatypes::DataType as Dt58;
let mut builder = FixedSizeBinaryBuilder::with_capacity(arr.len(), 32);
match arr.data_type() {
Dt58::LargeBinary => {
let a = arr
.as_any()
.downcast_ref::<LargeBinaryArray>()
.ok_or_else(|| anyhow!("seal: checksum not LargeBinaryArray"))?;
for i in 0..a.len() {
builder
.append_value(a.value(i))
.map_err(|e| anyhow!("seal: checksum not 32 bytes at row {i}: {e}"))?;
}
}
Dt58::Binary => {
let a = arr
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| anyhow!("seal: checksum not BinaryArray"))?;
for i in 0..a.len() {
builder
.append_value(a.value(i))
.map_err(|e| anyhow!("seal: checksum not 32 bytes at row {i}: {e}"))?;
}
}
other => return Err(anyhow!("seal: unexpected checksum type {other:?}")),
}
Ok(Arc::new(builder.finish()))
}
fn distinct_paths(batches: &[arrow58::record_batch::RecordBatch]) -> Result<u64> {
use arrow58::array::{Array, StringArray};
use std::collections::HashSet;
let mut set: HashSet<String> = HashSet::new();
for b in batches {
let paths = b
.column_by_name("relative_path")
.ok_or_else(|| anyhow!("seal: missing relative_path"))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("seal: relative_path not StringArray"))?;
for i in 0..paths.len() {
set.insert(paths.value(i).to_string());
}
}
Ok(set.len() as u64)
}