use crate::error::{ParxError, Result};
use crate::format::{BundleHeader, Trailer, BUNDLE_HEADER_SIZE, BUNDLE_MAGIC, TRAILER_SIZE};
use crate::proto::{BundleEntry, ParxBundle};
use bytes::Bytes;
use prost::Message;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
pub const BUNDLE_FILENAME: &str = "_parx_bundle.parx";
#[derive(Debug, Clone)]
pub struct BundleEntryData {
pub parquet_path: String,
pub source_size: u64,
pub footer_bytes: Bytes,
pub page_index_bytes: Bytes,
}
#[derive(Debug)]
pub struct ParxBundleWriter {
entries: Vec<BundleEntryData>,
}
impl ParxBundleWriter {
pub const fn new() -> Self {
Self {
entries: Vec::new(),
}
}
pub fn add_entry(
&mut self,
parquet_path: &str,
source_size: u64,
footer_bytes: impl Into<Bytes>,
) {
self.add_entry_with_page_indexes(parquet_path, source_size, footer_bytes, Bytes::new());
}
pub fn add_entry_with_page_indexes(
&mut self,
parquet_path: &str,
source_size: u64,
footer_bytes: impl Into<Bytes>,
page_index_bytes: impl Into<Bytes>,
) {
self.entries.push(BundleEntryData {
parquet_path: parquet_path.to_string(),
source_size,
footer_bytes: footer_bytes.into(),
page_index_bytes: page_index_bytes.into(),
});
}
#[inline]
pub fn entry_count(&self) -> usize {
self.entries.len()
}
pub fn finish(self) -> Vec<u8> {
let header = BundleHeader::new(self.entries.len() as u64);
let header_bytes = header.to_bytes();
let mut current_offset = BUNDLE_HEADER_SIZE as u64;
let mut payload = Vec::new();
let mut bundle_entries = Vec::new();
for entry in &self.entries {
let footer_offset = current_offset;
let footer_length = entry.footer_bytes.len() as u64;
let footer_checksum = crc32c::crc32c(&entry.footer_bytes).to_le_bytes().to_vec();
payload.extend_from_slice(&entry.footer_bytes);
current_offset += footer_length;
let (page_index_offset, page_index_length, page_index_checksum) =
if entry.page_index_bytes.is_empty() {
(0, 0, Vec::new())
} else {
let page_index_offset = current_offset;
let page_index_length = entry.page_index_bytes.len() as u64;
let page_index_checksum = crc32c::crc32c(&entry.page_index_bytes)
.to_le_bytes()
.to_vec();
payload.extend_from_slice(&entry.page_index_bytes);
current_offset += page_index_length;
(page_index_offset, page_index_length, page_index_checksum)
};
bundle_entries.push(BundleEntry {
parquet_path: entry.parquet_path.clone(),
source_size: entry.source_size,
footer_offset,
footer_length,
footer_checksum,
page_index_offset,
page_index_length,
page_index_checksum,
});
}
#[allow(clippy::cast_possible_truncation)]
let created_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let bundle = ParxBundle {
version: 1,
created_at_ms,
entries: bundle_entries,
};
let manifest_bytes = bundle.encode_to_vec();
let manifest_crc = crc32c::crc32c(&manifest_bytes);
let manifest_len = u32::try_from(manifest_bytes.len()).expect("manifest too large (>4GB)");
let trailer = Trailer::new(manifest_len, manifest_crc, BUNDLE_MAGIC);
let trailer_bytes = trailer.to_bytes();
let total_size =
header_bytes.len() + payload.len() + manifest_bytes.len() + trailer_bytes.len();
let mut output = Vec::with_capacity(total_size);
output.extend_from_slice(&header_bytes);
output.extend_from_slice(&payload);
output.extend_from_slice(&manifest_bytes);
output.extend_from_slice(&trailer_bytes);
output
}
}
impl Default for ParxBundleWriter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct BundleEntryRef<'a> {
pub parquet_path: &'a str,
pub source_size: u64,
pub footer_bytes: &'a [u8],
pub page_index_bytes: Option<&'a [u8]>,
}
#[derive(Debug, Clone)]
pub struct ParxBundleReader {
header: BundleHeader,
bundle: ParxBundle,
data: Bytes,
path_index: HashMap<String, usize>,
}
impl ParxBundleReader {
pub fn open(data: Bytes) -> Result<Self> {
let file_size = data.len();
let min_size = BUNDLE_HEADER_SIZE + TRAILER_SIZE;
if file_size < min_size {
return Err(ParxError::FileTooSmall {
size: file_size,
minimum: min_size,
});
}
let header_bytes: [u8; BUNDLE_HEADER_SIZE] = data[..BUNDLE_HEADER_SIZE]
.try_into()
.expect("header slice length verified above");
let header = BundleHeader::from_bytes(&header_bytes);
if !header.is_magic_valid() {
return Err(ParxError::InvalidBundleMagic(header.magic));
}
if !header.is_version_supported() {
return Err(ParxError::UnsupportedVersion {
major: header.version_major,
minor: header.version_minor,
});
}
let trailer_bytes: [u8; TRAILER_SIZE] = data[file_size - TRAILER_SIZE..]
.try_into()
.expect("trailer slice length verified above");
let trailer = Trailer::from_bytes(&trailer_bytes);
if !trailer.is_magic_valid(BUNDLE_MAGIC) {
return Err(ParxError::InvalidBundleMagic(trailer.magic));
}
let manifest_end = file_size - TRAILER_SIZE;
let manifest_start = manifest_end
.checked_sub(trailer.manifest_len as usize)
.ok_or(ParxError::FileTooSmall {
size: file_size,
minimum: min_size + trailer.manifest_len as usize,
})?;
let manifest_bytes = &data[manifest_start..manifest_end];
let actual_crc = crc32c::crc32c(manifest_bytes);
if actual_crc != trailer.manifest_crc32c {
return Err(ParxError::ManifestChecksumMismatch {
expected: trailer.manifest_crc32c,
actual: actual_crc,
});
}
let bundle = ParxBundle::decode(manifest_bytes)?;
let path_index: HashMap<String, usize> = bundle
.entries
.iter()
.enumerate()
.map(|(i, e)| (e.parquet_path.clone(), i))
.collect();
Ok(Self {
header,
bundle,
data,
path_index,
})
}
#[inline]
pub const fn header(&self) -> &BundleHeader {
&self.header
}
#[inline]
pub fn entry_count(&self) -> usize {
self.bundle.entries.len()
}
#[inline]
pub const fn created_at_ms(&self) -> u64 {
self.bundle.created_at_ms
}
#[inline]
pub fn contains(&self, parquet_path: &str) -> bool {
self.path_index.contains_key(parquet_path)
}
pub fn parquet_paths(&self) -> Vec<&str> {
self.bundle
.entries
.iter()
.map(|e| e.parquet_path.as_str())
.collect()
}
pub fn get_footer(&self, parquet_path: &str) -> Option<&[u8]> {
let idx = *self.path_index.get(parquet_path)?;
let entry = &self.bundle.entries[idx];
let start = usize::try_from(entry.footer_offset).ok()?;
let length = usize::try_from(entry.footer_length).ok()?;
let end = start
.checked_add(length)
.filter(|&end| end <= self.data.len())?;
Some(&self.data[start..end])
}
pub fn get_source_size(&self, parquet_path: &str) -> Option<u64> {
let idx = *self.path_index.get(parquet_path)?;
Some(self.bundle.entries[idx].source_size)
}
pub fn validate_source_size(&self, parquet_path: &str, actual_size: u64) -> bool {
self.get_source_size(parquet_path)
.is_some_and(|expected| expected == actual_size)
}
pub fn get_entry(&self, parquet_path: &str) -> Option<BundleEntryRef<'_>> {
let idx = *self.path_index.get(parquet_path)?;
let entry = &self.bundle.entries[idx];
let footer_start = usize::try_from(entry.footer_offset).ok()?;
let footer_length = usize::try_from(entry.footer_length).ok()?;
let footer_end = footer_start
.checked_add(footer_length)
.filter(|&end| end <= self.data.len())?;
let footer_bytes = &self.data[footer_start..footer_end];
let page_index_bytes = self.resolve_page_index_bytes(entry)?;
Some(BundleEntryRef {
parquet_path: &entry.parquet_path,
source_size: entry.source_size,
footer_bytes,
page_index_bytes,
})
}
pub fn iter_entries(&self) -> impl Iterator<Item = BundleEntryRef<'_>> {
self.bundle.entries.iter().filter_map(|entry| {
let footer_start = usize::try_from(entry.footer_offset).ok()?;
let footer_length = usize::try_from(entry.footer_length).ok()?;
let footer_end = footer_start
.checked_add(footer_length)
.filter(|&end| end <= self.data.len())?;
let footer_bytes = &self.data[footer_start..footer_end];
let page_index_bytes = self.resolve_page_index_bytes(entry)?;
Some(BundleEntryRef {
parquet_path: &entry.parquet_path,
source_size: entry.source_size,
footer_bytes,
page_index_bytes,
})
})
}
fn resolve_page_index_bytes<'a>(&'a self, entry: &BundleEntry) -> Option<Option<&'a [u8]>> {
if entry.page_index_length == 0 {
return Some(None);
}
let start = usize::try_from(entry.page_index_offset).ok()?;
let length = usize::try_from(entry.page_index_length).ok()?;
let end = start
.checked_add(length)
.filter(|&end| end <= self.data.len())?;
Some(Some(&self.data[start..end]))
}
pub fn validate_all(&self) -> Result<()> {
for entry in &self.bundle.entries {
let footer_start = usize::try_from(entry.footer_offset).map_err(|_| {
ParxError::InvalidPayloadBounds {
offset: entry.footer_offset,
length: entry.footer_length,
file_size: self.data.len() as u64,
}
})?;
let footer_length = usize::try_from(entry.footer_length).map_err(|_| {
ParxError::InvalidPayloadBounds {
offset: entry.footer_offset,
length: entry.footer_length,
file_size: self.data.len() as u64,
}
})?;
let footer_end = footer_start.checked_add(footer_length).ok_or_else(|| {
ParxError::InvalidPayloadBounds {
offset: entry.footer_offset,
length: entry.footer_length,
file_size: self.data.len() as u64,
}
})?;
if footer_end > self.data.len() {
return Err(ParxError::InvalidPayloadBounds {
offset: entry.footer_offset,
length: entry.footer_length,
file_size: self.data.len() as u64,
});
}
let footer_bytes = &self.data[footer_start..footer_end];
let footer_crc = crc32c::crc32c(footer_bytes);
if footer_crc.to_le_bytes().as_slice() != entry.footer_checksum.as_slice() {
return Err(ParxError::FooterChecksumMismatch);
}
if entry.page_index_length > 0 {
let page_index_start = usize::try_from(entry.page_index_offset).map_err(|_| {
ParxError::InvalidPayloadBounds {
offset: entry.page_index_offset,
length: entry.page_index_length,
file_size: self.data.len() as u64,
}
})?;
let page_index_length = usize::try_from(entry.page_index_length).map_err(|_| {
ParxError::InvalidPayloadBounds {
offset: entry.page_index_offset,
length: entry.page_index_length,
file_size: self.data.len() as u64,
}
})?;
let page_index_end =
page_index_start
.checked_add(page_index_length)
.ok_or_else(|| ParxError::InvalidPayloadBounds {
offset: entry.page_index_offset,
length: entry.page_index_length,
file_size: self.data.len() as u64,
})?;
if page_index_end > self.data.len() {
return Err(ParxError::InvalidPayloadBounds {
offset: entry.page_index_offset,
length: entry.page_index_length,
file_size: self.data.len() as u64,
});
}
let page_index_bytes = &self.data[page_index_start..page_index_end];
let page_index_crc = crc32c::crc32c(page_index_bytes);
if page_index_crc.to_le_bytes().as_slice() != entry.page_index_checksum.as_slice() {
return Err(ParxError::PageIndexChecksumMismatch);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bundle_roundtrip() {
let mut writer = ParxBundleWriter::new();
writer.add_entry("part-00000.parquet", 1000, b"footer0".to_vec());
writer.add_entry("part-00001.parquet", 2000, b"footer1".to_vec());
writer.add_entry("part-00002.parquet", 3000, b"footer2".to_vec());
let bundle_bytes = writer.finish();
let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
assert_eq!(reader.entry_count(), 3);
assert!(reader.contains("part-00000.parquet"));
assert!(reader.contains("part-00001.parquet"));
assert!(reader.contains("part-00002.parquet"));
assert!(!reader.contains("nonexistent.parquet"));
assert_eq!(
reader.get_footer("part-00000.parquet"),
Some(b"footer0".as_slice())
);
assert_eq!(
reader.get_footer("part-00001.parquet"),
Some(b"footer1".as_slice())
);
assert_eq!(
reader.get_footer("part-00002.parquet"),
Some(b"footer2".as_slice())
);
assert_eq!(reader.get_source_size("part-00000.parquet"), Some(1000));
assert_eq!(reader.get_source_size("part-00001.parquet"), Some(2000));
assert!(reader.validate_source_size("part-00000.parquet", 1000));
assert!(!reader.validate_source_size("part-00000.parquet", 9999));
}
#[test]
fn test_bundle_iter_entries() {
let mut writer = ParxBundleWriter::new();
writer.add_entry("a.parquet", 100, b"fa".to_vec());
writer.add_entry("b.parquet", 200, b"fb".to_vec());
let bundle_bytes = writer.finish();
let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
let entries: Vec<_> = reader.iter_entries().collect();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].parquet_path, "a.parquet");
assert_eq!(entries[0].footer_bytes, b"fa");
assert!(entries[0].page_index_bytes.is_none());
assert_eq!(entries[1].parquet_path, "b.parquet");
assert_eq!(entries[1].footer_bytes, b"fb");
assert!(entries[1].page_index_bytes.is_none());
}
#[test]
fn test_bundle_validate_all() {
let mut writer = ParxBundleWriter::new();
writer.add_entry("test.parquet", 1000, b"footer".to_vec());
let bundle_bytes = writer.finish();
let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
reader.validate_all().expect("validation should pass");
}
#[test]
fn test_bundle_parquet_paths() {
let mut writer = ParxBundleWriter::new();
writer.add_entry("z.parquet", 100, b"f".to_vec());
writer.add_entry("a.parquet", 200, b"f".to_vec());
writer.add_entry("m.parquet", 300, b"f".to_vec());
let bundle_bytes = writer.finish();
let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
let paths = reader.parquet_paths();
assert_eq!(paths, vec!["z.parquet", "a.parquet", "m.parquet"]);
}
#[test]
fn test_invalid_bundle_magic() {
let mut data = vec![0u8; 100];
data[0..4].copy_from_slice(b"NOPE");
let result = ParxBundleReader::open(Bytes::from(data));
assert!(matches!(result, Err(ParxError::InvalidBundleMagic(_))));
}
#[test]
fn test_bundle_manifest_crc_mismatch() {
let mut writer = ParxBundleWriter::new();
writer.add_entry("test.parquet", 1000, b"footer".to_vec());
let mut bundle_bytes = writer.finish();
let len = bundle_bytes.len();
bundle_bytes[len - 8] ^= 0xFF;
let result = ParxBundleReader::open(Bytes::from(bundle_bytes));
assert!(matches!(
result,
Err(ParxError::ManifestChecksumMismatch { .. })
));
}
#[test]
fn test_bundle_reader_handles_normal_case() {
let mut writer = ParxBundleWriter::new();
let footer = vec![1, 2, 3, 4];
writer.add_entry("test.parquet", 1000, footer.clone());
let bundle_bytes = writer.finish();
let reader = ParxBundleReader::open(bundle_bytes.into()).expect("Should open valid bundle");
let retrieved = reader
.get_footer("test.parquet")
.expect("Should find footer");
assert_eq!(retrieved, &footer[..]);
}
#[test]
fn test_bundle_with_page_indexes_roundtrip() {
let mut writer = ParxBundleWriter::new();
writer.add_entry_with_page_indexes(
"test.parquet",
1000,
b"footer".to_vec(),
b"pi".to_vec(),
);
let bundle_bytes = writer.finish();
let reader = ParxBundleReader::open(bundle_bytes.into()).expect("Should open valid bundle");
let entry = reader
.get_entry("test.parquet")
.expect("Should find bundle entry");
assert_eq!(entry.footer_bytes, b"footer");
assert_eq!(entry.page_index_bytes, Some(b"pi".as_slice()));
reader.validate_all().expect("validation should pass");
}
}