use std::fs::File;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
use anyhow::{Result, anyhow};
use arrow::array::{
BooleanArray, BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, StringArray,
StringBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder,
};
use arrow::datatypes::Schema;
use arrow::ipc::writer::StreamWriter;
use arrow::record_batch::RecordBatch;
use crate::index::{
ChunkLoc, LOOKUP_MODULE, MULTI_INDEX_MAGIC, ManifestEntry, RESERVED_PKG_TYPE, TRIE_MODULE,
lookup_schema, write_manifest_bytes,
};
#[derive(Debug, Clone)]
pub struct GroupKey {
pub pkg_type: i8,
pub repo: String,
pub module_name: String,
}
pub trait ArchiveMetaSink {
fn push_subindex(
&mut self,
schema: &Schema,
batches: &[RecordBatch],
key: GroupKey,
) -> Result<()>;
fn finish(self: Box<Self>) -> Result<u64>;
}
pub struct ArrowIpcSink {
file: Arc<File>,
cursor: u64,
entries: Vec<ManifestEntry>,
lookup_paths: Vec<String>,
lookup_locs: Vec<ChunkLoc>,
}
impl ArrowIpcSink {
pub fn new(file: Arc<File>, blob_end_offset: u64) -> Self {
Self {
file,
cursor: blob_end_offset,
entries: Vec::new(),
lookup_paths: Vec::new(),
lookup_locs: Vec::new(),
}
}
fn accumulate_lookup(&mut self, batch: &RecordBatch) {
let cols = (|| {
Some((
batch.column_by_name("relative_path")?.as_any().downcast_ref::<StringArray>()?,
batch.column_by_name("chunk_seq")?.as_any().downcast_ref::<UInt32Array>()?,
batch.column_by_name("fdata_offset")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("compressed")?.as_any().downcast_ref::<BooleanArray>()?,
batch.column_by_name("uncompressed_size")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("blob_offset")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("blob_size")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("checksum")?.as_any().downcast_ref::<FixedSizeBinaryArray>()?,
))
})();
let Some((paths, chunk_seq, fdata, compressed, usz, blob_off, blob_sz, checksum)) = cols
else { return; };
for i in 0..batch.num_rows() {
let mut ck = [0u8; 32];
ck.copy_from_slice(checksum.value(i));
self.lookup_paths.push(paths.value(i).to_string());
self.lookup_locs.push(ChunkLoc {
chunk_seq: chunk_seq.value(i),
fdata_offset: fdata.value(i),
blob_offset: blob_off.value(i),
blob_size: blob_sz.value(i),
uncompressed_size: usz.value(i),
compressed: compressed.value(i),
checksum: ck,
});
}
}
fn write_lookup_and_trie(&mut self) -> Result<()> {
let n = self.lookup_paths.len();
let mut order: Vec<usize> = (0..n).collect();
order.sort_by(|&a, &b| {
self.lookup_paths[a].cmp(&self.lookup_paths[b])
.then(self.lookup_locs[a].chunk_seq.cmp(&self.lookup_locs[b].chunk_seq))
});
let mut path_b = StringBuilder::with_capacity(n, n * 16);
let mut seq_b = UInt32Builder::with_capacity(n);
let mut fdata_b = UInt64Builder::with_capacity(n);
let mut comp_b = BooleanBuilder::with_capacity(n);
let mut usz_b = UInt64Builder::with_capacity(n);
let mut boff_b = UInt64Builder::with_capacity(n);
let mut bsz_b = UInt64Builder::with_capacity(n);
let mut ck_b = FixedSizeBinaryBuilder::with_capacity(n, 32);
for &i in &order {
let loc = &self.lookup_locs[i];
path_b.append_value(&self.lookup_paths[i]);
seq_b.append_value(loc.chunk_seq);
fdata_b.append_value(loc.fdata_offset);
comp_b.append_value(loc.compressed);
usz_b.append_value(loc.uncompressed_size);
boff_b.append_value(loc.blob_offset);
bsz_b.append_value(loc.blob_size);
ck_b.append_value(loc.checksum).expect("checksum is 32 bytes");
}
let schema = lookup_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(path_b.finish()),
Arc::new(seq_b.finish()),
Arc::new(fdata_b.finish()),
Arc::new(comp_b.finish()),
Arc::new(usz_b.finish()),
Arc::new(boff_b.finish()),
Arc::new(bsz_b.finish()),
Arc::new(ck_b.finish()),
],
)?;
self.push_subindex(&schema, &[batch], GroupKey {
pkg_type: RESERVED_PKG_TYPE,
repo: String::new(),
module_name: LOOKUP_MODULE.to_string(),
})?;
let mut builder = fst::MapBuilder::memory();
let mut prev: Option<&str> = None;
for (sorted_idx, &orig) in order.iter().enumerate() {
let p = self.lookup_paths[orig].as_str();
if prev != Some(p) {
builder.insert(p.as_bytes(), sorted_idx as u64)
.map_err(|e| anyhow!("trie insert: {e}"))?;
prev = Some(p);
}
}
let trie_bytes = builder.into_inner().map_err(|e| anyhow!("trie finish: {e}"))?;
self.write_raw_section(&trie_bytes, GroupKey {
pkg_type: RESERVED_PKG_TYPE,
repo: String::new(),
module_name: TRIE_MODULE.to_string(),
})
}
fn write_raw_section(&mut self, bytes: &[u8], key: GroupKey) -> Result<()> {
let start = self.cursor;
self.file.write_all_at(bytes, start)?;
self.cursor += bytes.len() as u64;
self.entries.push(ManifestEntry {
pkg_type: key.pkg_type,
repo: key.repo,
module_name: key.module_name,
index_offset: start,
index_len: bytes.len() as u64,
row_count: 0,
});
Ok(())
}
}
impl ArchiveMetaSink for ArrowIpcSink {
fn push_subindex(
&mut self,
schema: &Schema,
batches: &[RecordBatch],
key: GroupKey,
) -> Result<()> {
let sub_start = self.cursor;
let mut sub_bytes: Vec<u8> = Vec::new();
let mut sw = StreamWriter::try_new(&mut sub_bytes, schema)
.map_err(|e| anyhow!("sub-index writer: {e}"))?;
let mut row_count = 0u64;
for batch in batches {
row_count += batch.num_rows() as u64;
sw.write(batch).map_err(|e| anyhow!("sub-index write: {e}"))?;
}
sw.finish().map_err(|e| anyhow!("sub-index finish: {e}"))?;
if key.module_name != LOOKUP_MODULE && key.module_name != TRIE_MODULE {
for batch in batches {
self.accumulate_lookup(batch);
}
}
let sub_len = sub_bytes.len() as u64;
self.file.write_all_at(&sub_bytes, sub_start)?;
self.cursor += sub_len;
self.entries.push(ManifestEntry {
pkg_type: key.pkg_type,
repo: key.repo,
module_name: key.module_name,
index_offset: sub_start,
index_len: sub_len,
row_count,
});
Ok(())
}
fn finish(mut self: Box<Self>) -> Result<u64> {
self.write_lookup_and_trie()?;
let manifest_offset = self.cursor;
let manifest_bytes =
write_manifest_bytes(&self.entries).map_err(|e| anyhow!("manifest: {e}"))?;
self.file.write_all_at(&manifest_bytes, manifest_offset)?;
let after = manifest_offset + manifest_bytes.len() as u64;
self.file.write_all_at(&MULTI_INDEX_MAGIC, after)?;
self.file.write_all_at(
&manifest_offset.to_le_bytes(),
after + MULTI_INDEX_MAGIC.len() as u64,
)?;
self.file.sync_all()?;
Ok(after + MULTI_INDEX_MAGIC.len() as u64 + 8)
}
}