use std::collections::HashMap;
use std::fs::File;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result, anyhow};
use znippy_common::arrow as arrow58;
use znippy_common::codec;
use znippy_common::{ArchiveMetaSink, GroupKey, ZnippyReader};
use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use arrow_array::{BooleanArray, RecordBatch as RecordBatch57, StringArray};
use arrow_schema::{DataType as DataType57, Field as Field57, Schema as Schema57};
struct PendingSubindex {
ipc_bytes: Vec<u8>,
key: GroupKey,
}
pub struct IcebergSink {
warehouse: PathBuf,
namespace: String,
pending: Vec<PendingSubindex>,
}
impl IcebergSink {
pub fn new(warehouse: impl Into<PathBuf>, namespace: impl Into<String>) -> Self {
Self {
warehouse: warehouse.into(),
namespace: namespace.into(),
pending: Vec::new(),
}
}
}
impl ArchiveMetaSink for IcebergSink {
fn push_subindex(
&mut self,
schema: &arrow58::datatypes::Schema,
batches: &[arrow58::record_batch::RecordBatch],
key: GroupKey,
) -> Result<()> {
let mut ipc_bytes: Vec<u8> = Vec::new();
{
let mut sw = arrow58::ipc::writer::StreamWriter::try_new(&mut ipc_bytes, schema)
.map_err(|e| anyhow!("iceberg sink: ipc writer: {e}"))?;
for batch in batches {
sw.write(batch)
.map_err(|e| anyhow!("iceberg sink: ipc write: {e}"))?;
}
sw.finish()
.map_err(|e| anyhow!("iceberg sink: ipc finish: {e}"))?;
}
self.pending.push(PendingSubindex { ipc_bytes, key });
Ok(())
}
fn finish(self: Box<Self>) -> Result<u64> {
std::fs::create_dir_all(&self.warehouse)
.with_context(|| format!("creating warehouse dir {}", self.warehouse.display()))?;
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("iceberg sink: building tokio runtime")?;
rt.block_on(self.write_all())?;
Ok(dir_size_bytes(&self.warehouse))
}
}
impl IcebergSink {
async fn write_all(&self) -> Result<()> {
let wh = skade::open(&self.warehouse)
.await
.map_err(|e| anyhow!("iceberg sink: open warehouse: {e}"))?;
for (i, sub) in self.pending.iter().enumerate() {
let table_name = qualified_table_name(&self.namespace, &sub.key, i);
self.write_subindex(&wh, &table_name, &sub.ipc_bytes)
.await
.with_context(|| format!("iceberg sink: writing sub-index '{table_name}'"))?;
}
Ok(())
}
async fn write_subindex(
&self,
wh: &skade::Warehouse,
table_name: &str,
ipc_bytes: &[u8],
) -> Result<()> {
let batches = read_ipc_as_arrow57(ipc_bytes)?;
let widened: Vec<RecordBatch57> = batches
.iter()
.map(|b| {
let w = skade::widen_for_iceberg(b).map_err(|e| anyhow!("widen for iceberg: {e}"))?;
lower_fixed_binary(&w)
})
.collect::<Result<_>>()?;
let arrow_schema = widened
.first()
.map(|b| b.schema())
.ok_or_else(|| anyhow!("sub-index has no batches"))?;
let mut table = wh
.table_or_create(table_name, arrow_schema.as_ref())
.await
.map_err(|e| anyhow!("table_or_create '{table_name}': {e}"))?;
table
.append(&widened)
.await
.map_err(|e| anyhow!("append to '{table_name}': {e}"))?;
Ok(())
}
}
pub struct IcebergZnippyReader {
sidecar: Arc<File>,
file_index: HashMap<String, FileEntry>,
}
struct ChunkInfo {
blob_offset: u64,
blob_size: u64,
fdata_offset: u64,
compressed: bool,
}
struct FileEntry {
uncompressed_size: u64,
chunks: Vec<ChunkInfo>,
}
impl IcebergZnippyReader {
pub fn open(
sidecar: &Path,
warehouse: &Path,
namespace: &str,
) -> Result<Self> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("iceberg reader: building tokio runtime")?;
let batches = rt.block_on(read_all_tables(warehouse, namespace))?;
let file_index = build_file_index(&batches)?;
let sidecar = Arc::new(
File::open(sidecar)
.with_context(|| format!("opening .znippy sidecar {}", sidecar.display()))?,
);
Ok(Self { sidecar, file_index })
}
pub fn file_count(&self) -> usize {
self.file_index.len()
}
}
async fn read_all_tables(warehouse: &Path, namespace: &str) -> Result<Vec<RecordBatch57>> {
let wh = skade::open(warehouse)
.await
.map_err(|e| anyhow!("iceberg reader: open warehouse: {e}"))?;
let idents = wh
.table_idents()
.await
.map_err(|e| anyhow!("iceberg reader: list tables: {e}"))?;
let base = base_index_schema_arrow57();
let mut out = Vec::new();
for ident in idents {
if ident.namespace().as_ref().as_slice() != [namespace.to_string()] {
continue;
}
let qualified = format!("{namespace}.{}", ident.name());
let table = wh
.table(&qualified)
.await
.map_err(|e| anyhow!("iceberg reader: load '{qualified}': {e}"))?;
let widened = table
.read()
.await
.map_err(|e| anyhow!("iceberg reader: read '{qualified}': {e}"))?;
if widened.is_empty() {
continue;
}
let projected = project_to_base(&widened)?;
let unwidened = skade::unwiden(&projected, base.clone())
.map_err(|e| anyhow!("iceberg reader: unwiden: {e}"))?;
out.extend(unwidened);
}
Ok(out)
}
fn base_index_schema_arrow57() -> Arc<Schema57> {
Arc::new(Schema57::new(vec![
Field57::new("relative_path", DataType57::Utf8, false),
Field57::new("chunk_seq", DataType57::UInt32, false),
Field57::new("fdata_offset", DataType57::UInt64, false),
Field57::new("compressed", DataType57::Boolean, false),
Field57::new("uncompressed_size", DataType57::UInt64, false),
Field57::new("blob_offset", DataType57::UInt64, false),
Field57::new("blob_size", DataType57::UInt64, false),
Field57::new("checksum", DataType57::LargeBinary, false),
]))
}
const BASE_COLUMNS: [&str; 8] = [
"relative_path",
"chunk_seq",
"fdata_offset",
"compressed",
"uncompressed_size",
"blob_offset",
"blob_size",
"checksum",
];
fn project_to_base(batches: &[RecordBatch57]) -> Result<Vec<RecordBatch57>> {
let mut out = Vec::with_capacity(batches.len());
for b in batches {
let mut fields = Vec::with_capacity(BASE_COLUMNS.len());
let mut columns = Vec::with_capacity(BASE_COLUMNS.len());
for name in BASE_COLUMNS {
let idx = b
.schema()
.index_of(name)
.map_err(|_| anyhow!("sub-index missing base column '{name}'"))?;
fields.push(b.schema().field(idx).clone());
columns.push(b.column(idx).clone());
}
out.push(
RecordBatch57::try_new(Arc::new(Schema57::new(fields)), columns)
.map_err(|e| anyhow!("project base columns: {e}"))?,
);
}
Ok(out)
}
fn build_file_index(batches: &[RecordBatch57]) -> Result<HashMap<String, FileEntry>> {
let mut index: HashMap<String, FileEntry> = HashMap::new();
for batch in batches {
let paths = batch
.column_by_name("relative_path")
.ok_or_else(|| anyhow!("missing relative_path column"))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("relative_path not StringArray"))?;
let compressed_col = batch
.column_by_name("compressed")
.ok_or_else(|| anyhow!("missing compressed column"))?
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| anyhow!("compressed not BooleanArray"))?;
let sizes = batch
.column_by_name("uncompressed_size")
.ok_or_else(|| anyhow!("missing uncompressed_size column"))?
.as_primitive_opt::<UInt64Type>()
.ok_or_else(|| anyhow!("uncompressed_size not UInt64Array"))?;
let blob_offset_col = batch
.column_by_name("blob_offset")
.ok_or_else(|| anyhow!("missing blob_offset column"))?
.as_primitive_opt::<UInt64Type>()
.ok_or_else(|| anyhow!("blob_offset not UInt64Array"))?;
let blob_size_col = batch
.column_by_name("blob_size")
.ok_or_else(|| anyhow!("missing blob_size column"))?
.as_primitive_opt::<UInt64Type>()
.ok_or_else(|| anyhow!("blob_size not UInt64Array"))?;
let fdata_offset_col = batch
.column_by_name("fdata_offset")
.ok_or_else(|| anyhow!("missing fdata_offset column"))?
.as_primitive_opt::<UInt64Type>()
.ok_or_else(|| anyhow!("fdata_offset not UInt64Array"))?;
for row in 0..batch.num_rows() {
let path = paths.value(row).to_string();
let compressed = compressed_col.value(row);
let uncompressed_size = sizes.value(row);
let blob_offset = blob_offset_col.value(row);
let blob_size = blob_size_col.value(row);
let fdata_offset = fdata_offset_col.value(row);
let entry = index.entry(path).or_insert_with(|| FileEntry {
uncompressed_size: 0,
chunks: Vec::new(),
});
entry.uncompressed_size += uncompressed_size;
entry.chunks.push(ChunkInfo {
blob_offset,
blob_size,
fdata_offset,
compressed,
});
}
}
for entry in index.values_mut() {
entry.chunks.sort_by_key(|c| c.fdata_offset);
}
Ok(index)
}
impl ZnippyReader for IcebergZnippyReader {
fn list_files(&self) -> Result<Vec<String>> {
Ok(self.file_index.keys().cloned().collect())
}
fn extract_file(&self, relative_path: &str) -> Result<Vec<u8>> {
let entry = self
.file_index
.get(relative_path)
.ok_or_else(|| anyhow!("file not found in archive: {}", relative_path))?;
let mut result = Vec::with_capacity(entry.uncompressed_size as usize);
let mut blob = Vec::new(); let mut decomp = Vec::new();
for chunk in &entry.chunks {
blob.resize(chunk.blob_size as usize, 0);
self.sidecar.read_exact_at(&mut blob, chunk.blob_offset)?;
if chunk.compressed {
codec::decompress_into(&blob, &mut decomp)?;
result.extend_from_slice(&decomp);
} else {
result.extend_from_slice(&blob);
}
}
Ok(result)
}
fn contains(&self, relative_path: &str) -> bool {
self.file_index.contains_key(relative_path)
}
fn file_size(&self, relative_path: &str) -> Option<u64> {
self.file_index.get(relative_path).map(|e| e.uncompressed_size)
}
}
fn lower_fixed_binary(batch: &RecordBatch57) -> Result<RecordBatch57> {
let schema = batch.schema();
if !schema
.fields()
.iter()
.any(|f| matches!(f.data_type(), DataType57::FixedSizeBinary(_)))
{
return Ok(batch.clone());
}
let mut fields = Vec::with_capacity(schema.fields().len());
let mut columns = Vec::with_capacity(schema.fields().len());
for (i, field) in schema.fields().iter().enumerate() {
if matches!(field.data_type(), DataType57::FixedSizeBinary(_)) {
let casted = arrow_cast::cast(batch.column(i), &DataType57::LargeBinary)
.map_err(|e| anyhow!("cast checksum to LargeBinary: {e}"))?;
fields.push(Field57::new(field.name(), DataType57::LargeBinary, field.is_nullable()));
columns.push(casted);
} else {
fields.push(field.as_ref().clone());
columns.push(batch.column(i).clone());
}
}
RecordBatch57::try_new(Arc::new(Schema57::new(fields)), columns)
.map_err(|e| anyhow!("rebuild after fixed-binary lowering: {e}"))
}
fn read_ipc_as_arrow57(bytes: &[u8]) -> Result<Vec<RecordBatch57>> {
let reader = arrow_ipc::reader::StreamReader::try_new(std::io::Cursor::new(bytes), None)
.map_err(|e| anyhow!("ipc read (arrow57): {e}"))?;
let mut out = Vec::new();
for batch in reader {
out.push(batch.map_err(|e| anyhow!("ipc batch decode: {e}"))?);
}
Ok(out)
}
fn qualified_table_name(namespace: &str, key: &GroupKey, index: usize) -> String {
let sanitize = |s: &str| -> String {
let cleaned: String = s
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
.collect();
cleaned.trim_matches('_').to_string()
};
let ns = sanitize(namespace);
let repo = sanitize(&key.repo);
let module = sanitize(&key.module_name);
let mut name = format!("idx_{:03}", (key.pkg_type as i16) & 0xff);
if !repo.is_empty() {
name.push('_');
name.push_str(&repo);
}
if !module.is_empty() {
name.push('_');
name.push_str(&module);
}
format!("{ns}.{name}_{index}")
}
fn dir_size_bytes(root: &Path) -> u64 {
fn rec(p: &Path, acc: &mut u64) {
if let Ok(rd) = std::fs::read_dir(p) {
for e in rd.flatten() {
let path = e.path();
if path.is_dir() {
rec(&path, acc);
} else if let Ok(meta) = e.metadata() {
*acc += meta.len();
}
}
}
}
let mut acc = 0u64;
rec(root, &mut acc);
acc
}