use std::fs::File;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
use anyhow::{Result, anyhow};
use arrow::array::{
BooleanArray, BooleanBuilder, FixedSizeBinaryArray, FixedSizeBinaryBuilder, StringArray,
StringBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder,
};
use arrow::datatypes::Schema;
use arrow::ipc::writer::StreamWriter;
use arrow::record_batch::RecordBatch;
use crate::index::{
ChunkLoc, LOOKUP_MODULE, MULTI_INDEX_MAGIC, ManifestEntry, RESERVED_PKG_TYPE, TRIE_MODULE,
lookup_schema, write_manifest_bytes,
};
#[cfg(feature = "sign")]
use crate::index::{SIGN_ARCHIVE_MODULE, SIGN_ARTIFACTS_MODULE};
#[derive(Debug, Clone)]
pub struct GroupKey {
pub pkg_type: i8,
pub repo: String,
pub module_name: String,
}
pub trait ArchiveMetaSink {
fn push_subindex(
&mut self,
schema: &Schema,
batches: &[RecordBatch],
key: GroupKey,
) -> Result<()>;
fn finish(self: Box<Self>) -> Result<u64>;
}
pub type MetaSinkFactory = Box<dyn FnOnce(Arc<File>, u64) -> Box<dyn ArchiveMetaSink> + Send>;
pub struct ArrowIpcSink {
file: Arc<File>,
cursor: u64,
entries: Vec<ManifestEntry>,
lookup_paths: Vec<String>,
lookup_locs: Vec<ChunkLoc>,
#[cfg(feature = "sign")]
signer: Option<Box<dyn crate::sign::ArchiveSigner + Send>>,
}
impl ArrowIpcSink {
pub fn new(file: Arc<File>, blob_end_offset: u64) -> Self {
Self {
file,
cursor: blob_end_offset,
entries: Vec::new(),
lookup_paths: Vec::new(),
lookup_locs: Vec::new(),
#[cfg(feature = "sign")]
signer: None,
}
}
#[cfg(feature = "sign")]
pub fn with_signer(mut self, signer: Box<dyn crate::sign::ArchiveSigner + Send>) -> Self {
self.signer = Some(signer);
self
}
fn accumulate_lookup(&mut self, batch: &RecordBatch) {
let cols = (|| {
Some((
batch.column_by_name("relative_path")?.as_any().downcast_ref::<StringArray>()?,
batch.column_by_name("chunk_seq")?.as_any().downcast_ref::<UInt32Array>()?,
batch.column_by_name("fdata_offset")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("compressed")?.as_any().downcast_ref::<BooleanArray>()?,
batch.column_by_name("uncompressed_size")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("blob_offset")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("blob_size")?.as_any().downcast_ref::<UInt64Array>()?,
batch.column_by_name("checksum")?.as_any().downcast_ref::<FixedSizeBinaryArray>()?,
))
})();
let Some((paths, chunk_seq, fdata, compressed, usz, blob_off, blob_sz, checksum)) = cols
else { return; };
for i in 0..batch.num_rows() {
let mut ck = [0u8; 32];
ck.copy_from_slice(checksum.value(i));
self.lookup_paths.push(paths.value(i).to_string());
self.lookup_locs.push(ChunkLoc {
chunk_seq: chunk_seq.value(i),
fdata_offset: fdata.value(i),
blob_offset: blob_off.value(i),
blob_size: blob_sz.value(i),
uncompressed_size: usz.value(i),
compressed: compressed.value(i),
checksum: ck,
});
}
}
fn write_lookup_and_trie(&mut self) -> Result<()> {
let n = self.lookup_paths.len();
let mut order: Vec<usize> = (0..n).collect();
order.sort_by(|&a, &b| {
self.lookup_paths[a].cmp(&self.lookup_paths[b])
.then(self.lookup_locs[a].chunk_seq.cmp(&self.lookup_locs[b].chunk_seq))
});
let mut path_b = StringBuilder::with_capacity(n, n * 16);
let mut seq_b = UInt32Builder::with_capacity(n);
let mut fdata_b = UInt64Builder::with_capacity(n);
let mut comp_b = BooleanBuilder::with_capacity(n);
let mut usz_b = UInt64Builder::with_capacity(n);
let mut boff_b = UInt64Builder::with_capacity(n);
let mut bsz_b = UInt64Builder::with_capacity(n);
let mut ck_b = FixedSizeBinaryBuilder::with_capacity(n, 32);
for &i in &order {
let loc = &self.lookup_locs[i];
path_b.append_value(&self.lookup_paths[i]);
seq_b.append_value(loc.chunk_seq);
fdata_b.append_value(loc.fdata_offset);
comp_b.append_value(loc.compressed);
usz_b.append_value(loc.uncompressed_size);
boff_b.append_value(loc.blob_offset);
bsz_b.append_value(loc.blob_size);
ck_b.append_value(loc.checksum).expect("checksum is 32 bytes");
}
let schema = lookup_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(path_b.finish()),
Arc::new(seq_b.finish()),
Arc::new(fdata_b.finish()),
Arc::new(comp_b.finish()),
Arc::new(usz_b.finish()),
Arc::new(boff_b.finish()),
Arc::new(bsz_b.finish()),
Arc::new(ck_b.finish()),
],
)?;
self.push_subindex(&schema, &[batch], GroupKey {
pkg_type: RESERVED_PKG_TYPE,
repo: String::new(),
module_name: LOOKUP_MODULE.to_string(),
})?;
let mut builder = fst::MapBuilder::memory();
let mut prev: Option<&str> = None;
for (sorted_idx, &orig) in order.iter().enumerate() {
let p = self.lookup_paths[orig].as_str();
if prev != Some(p) {
builder.insert(p.as_bytes(), sorted_idx as u64)
.map_err(|e| anyhow!("trie insert: {e}"))?;
prev = Some(p);
}
}
let trie_bytes = builder.into_inner().map_err(|e| anyhow!("trie finish: {e}"))?;
self.write_raw_section(&trie_bytes, GroupKey {
pkg_type: RESERVED_PKG_TYPE,
repo: String::new(),
module_name: TRIE_MODULE.to_string(),
})
}
#[cfg(feature = "sign")]
fn write_signatures(&mut self) -> Result<()> {
use std::collections::BTreeMap;
let Some(signer) = self.signer.take() else {
return Ok(());
};
let (file_digests, artifact_paths, artifact_cms): (
Vec<(String, [u8; 32])>,
Vec<String>,
Vec<Vec<u8>>,
) = {
let mut by_path: BTreeMap<&str, Vec<(u32, &[u8; 32])>> = BTreeMap::new();
for (p, loc) in self.lookup_paths.iter().zip(self.lookup_locs.iter()) {
by_path.entry(p.as_str()).or_default().push((loc.chunk_seq, &loc.checksum));
}
let mut digs = Vec::with_capacity(by_path.len());
let mut paths = Vec::with_capacity(by_path.len());
let mut cmss = Vec::with_capacity(by_path.len());
for (path, mut chunks) in by_path {
chunks.sort_by_key(|(seq, _)| *seq);
let n = chunks.len();
let digest = crate::sign::file_digest_from_parts(
path,
chunks.iter().map(|(s, c)| (*s, *c)),
n,
);
let cms = signer.sign_digest(&digest)?;
digs.push((path.to_string(), digest));
paths.push(path.to_string());
cmss.push(cms);
}
(digs, paths, cmss)
};
let footer = crate::index::IndexFooter::Multi { manifest_offset: 0 };
let root = crate::sign::archive_root(&file_digests, &footer);
let archive_cms = signer.sign_digest(&root)?;
let artifacts_bytes = serialize_artifact_signatures(&artifact_paths, &artifact_cms)?;
self.write_raw_section(
&artifacts_bytes,
GroupKey {
pkg_type: RESERVED_PKG_TYPE,
repo: String::new(),
module_name: SIGN_ARTIFACTS_MODULE.to_string(),
},
)?;
self.write_raw_section(
&archive_cms,
GroupKey {
pkg_type: RESERVED_PKG_TYPE,
repo: String::new(),
module_name: SIGN_ARCHIVE_MODULE.to_string(),
},
)?;
Ok(())
}
fn write_raw_section(&mut self, bytes: &[u8], key: GroupKey) -> Result<()> {
let start = self.cursor;
self.file.write_all_at(bytes, start)?;
self.cursor += bytes.len() as u64;
self.entries.push(ManifestEntry {
pkg_type: key.pkg_type,
repo: key.repo,
module_name: key.module_name,
index_offset: start,
index_len: bytes.len() as u64,
row_count: 0,
});
Ok(())
}
}
impl ArchiveMetaSink for ArrowIpcSink {
fn push_subindex(
&mut self,
schema: &Schema,
batches: &[RecordBatch],
key: GroupKey,
) -> Result<()> {
let sub_start = self.cursor;
let mut sub_bytes: Vec<u8> = Vec::new();
let mut sw = StreamWriter::try_new(&mut sub_bytes, schema)
.map_err(|e| anyhow!("sub-index writer: {e}"))?;
let mut row_count = 0u64;
for batch in batches {
row_count += batch.num_rows() as u64;
sw.write(batch).map_err(|e| anyhow!("sub-index write: {e}"))?;
}
sw.finish().map_err(|e| anyhow!("sub-index finish: {e}"))?;
if key.module_name != LOOKUP_MODULE && key.module_name != TRIE_MODULE {
for batch in batches {
self.accumulate_lookup(batch);
}
}
let sub_len = sub_bytes.len() as u64;
self.file.write_all_at(&sub_bytes, sub_start)?;
self.cursor += sub_len;
self.entries.push(ManifestEntry {
pkg_type: key.pkg_type,
repo: key.repo,
module_name: key.module_name,
index_offset: sub_start,
index_len: sub_len,
row_count,
});
Ok(())
}
fn finish(mut self: Box<Self>) -> Result<u64> {
self.write_lookup_and_trie()?;
#[cfg(feature = "sign")]
self.write_signatures()?;
let manifest_offset = self.cursor;
let manifest_bytes =
write_manifest_bytes(&self.entries).map_err(|e| anyhow!("manifest: {e}"))?;
self.file.write_all_at(&manifest_bytes, manifest_offset)?;
let after = manifest_offset + manifest_bytes.len() as u64;
self.file.write_all_at(&MULTI_INDEX_MAGIC, after)?;
self.file.write_all_at(
&manifest_offset.to_le_bytes(),
after + MULTI_INDEX_MAGIC.len() as u64,
)?;
self.file.sync_all()?;
Ok(after + MULTI_INDEX_MAGIC.len() as u64 + 8)
}
}
#[cfg(feature = "sign")]
fn serialize_artifact_signatures(paths: &[String], cms: &[Vec<u8>]) -> Result<Vec<u8>> {
use arrow::array::BinaryBuilder;
use arrow::datatypes::{DataType, Field, Schema};
let n = paths.len();
let schema = Arc::new(Schema::new(vec![
Field::new("relative_path", DataType::Utf8, false),
Field::new("cms", DataType::Binary, false),
]));
let mut path_b = StringBuilder::with_capacity(n, n * 32);
let mut cms_b = BinaryBuilder::with_capacity(n, n * 512);
for (p, c) in paths.iter().zip(cms.iter()) {
path_b.append_value(p);
cms_b.append_value(c);
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(path_b.finish()), Arc::new(cms_b.finish())],
)?;
let mut buf = Vec::new();
{
let mut w = StreamWriter::try_new(&mut buf, &schema)
.map_err(|e| anyhow!("artifact-sig writer: {e}"))?;
w.write(&batch).map_err(|e| anyhow!("artifact-sig write: {e}"))?;
w.finish().map_err(|e| anyhow!("artifact-sig finish: {e}"))?;
}
Ok(buf)
}