use crate::compression;
use crate::error::{ParxError, Result};
use crate::format::{
Compression, Header, Trailer, HEADER_SIZE, MAGIC, MIN_FILE_SIZE, TRAILER_SIZE,
};
use crate::proto::ParxManifest;
use bytes::Bytes;
use prost::Message;
use std::ops::Range;
#[derive(Debug, Clone)]
enum Payload {
Borrowed(Bytes),
Owned(Bytes),
}
impl Payload {
fn as_slice(&self) -> &[u8] {
match self {
Self::Borrowed(bytes) | Self::Owned(bytes) => bytes.as_ref(),
}
}
}
#[derive(Debug, Clone)]
pub struct ParxReader {
header: Header,
manifest: ParxManifest,
footer_bytes: Payload,
page_index_bytes: Option<Payload>,
}
impl ParxReader {
pub fn open(bytes: &[u8]) -> Result<Self> {
Self::open_with_payload(bytes, |range| {
Payload::Owned(Bytes::copy_from_slice(&bytes[range]))
})
}
fn open_with_payload<F>(bytes: &[u8], make_payload: F) -> Result<Self>
where
F: Fn(Range<usize>) -> Payload,
{
let file_size = bytes.len();
if file_size < MIN_FILE_SIZE {
return Err(ParxError::FileTooSmall {
size: file_size,
minimum: MIN_FILE_SIZE,
});
}
let header_bytes: [u8; HEADER_SIZE] = bytes[..HEADER_SIZE]
.try_into()
.expect("header slice length verified above");
let header = Header::from_bytes(&header_bytes);
if !header.is_magic_valid(MAGIC) {
return Err(ParxError::InvalidMagic(header.magic));
}
if !header.is_version_supported() {
return Err(ParxError::UnsupportedVersion {
major: header.version_major,
minor: header.version_minor,
});
}
let trailer_bytes: [u8; TRAILER_SIZE] = bytes[file_size - TRAILER_SIZE..]
.try_into()
.expect("trailer slice length verified above");
let trailer = Trailer::from_bytes(&trailer_bytes);
if !trailer.is_magic_valid(MAGIC) {
return Err(ParxError::InvalidMagic(trailer.magic));
}
let manifest_end = file_size - TRAILER_SIZE;
let manifest_start = manifest_end
.checked_sub(trailer.manifest_len as usize)
.ok_or(ParxError::FileTooSmall {
size: file_size,
minimum: MIN_FILE_SIZE + trailer.manifest_len as usize,
})?;
let manifest_bytes = &bytes[manifest_start..manifest_end];
let actual_crc = crc32c::crc32c(manifest_bytes);
if actual_crc != trailer.manifest_crc32c {
return Err(ParxError::ManifestChecksumMismatch {
expected: trailer.manifest_crc32c,
actual: actual_crc,
});
}
let manifest = ParxManifest::decode(manifest_bytes)?;
let footer_offset = usize::try_from(manifest.footer_offset).map_err(|_| {
ParxError::InvalidPayloadBounds {
offset: manifest.footer_offset,
length: manifest.footer_length,
file_size: file_size as u64,
}
})?;
let footer_length = usize::try_from(manifest.footer_length).map_err(|_| {
ParxError::InvalidPayloadBounds {
offset: manifest.footer_offset,
length: manifest.footer_length,
file_size: file_size as u64,
}
})?;
let footer_end =
footer_offset
.checked_add(footer_length)
.ok_or(ParxError::InvalidPayloadBounds {
offset: manifest.footer_offset,
length: manifest.footer_length,
file_size: file_size as u64,
})?;
if footer_offset < HEADER_SIZE || footer_end > manifest_start {
return Err(ParxError::InvalidPayloadBounds {
offset: manifest.footer_offset,
length: manifest.footer_length,
file_size: file_size as u64,
});
}
let stored_footer_bytes = &bytes[footer_offset..footer_end];
let footer_crc = crc32c::crc32c(stored_footer_bytes);
if footer_crc.to_le_bytes().as_slice() != manifest.footer_checksum.as_slice() {
return Err(ParxError::FooterChecksumMismatch);
}
let footer_bytes = if let Some(algo) = header.compression_algorithm() {
let uncompressed_size =
usize::try_from(manifest.footer_uncompressed_size).map_err(|_| {
ParxError::InvalidFormat("footer uncompressed size too large".to_string())
})?;
Payload::Owned(Bytes::from(compression::decompress(
stored_footer_bytes,
algo,
uncompressed_size,
)?))
} else {
make_payload(footer_offset..footer_end)
};
let page_index_bytes = if manifest.page_index_length > 0 {
let page_index_offset = usize::try_from(manifest.page_index_offset).map_err(|_| {
ParxError::InvalidPayloadBounds {
offset: manifest.page_index_offset,
length: manifest.page_index_length,
file_size: file_size as u64,
}
})?;
let page_index_length = usize::try_from(manifest.page_index_length).map_err(|_| {
ParxError::InvalidPayloadBounds {
offset: manifest.page_index_offset,
length: manifest.page_index_length,
file_size: file_size as u64,
}
})?;
let page_index_end = page_index_offset.checked_add(page_index_length).ok_or(
ParxError::InvalidPayloadBounds {
offset: manifest.page_index_offset,
length: manifest.page_index_length,
file_size: file_size as u64,
},
)?;
if page_index_offset < footer_end || page_index_end > manifest_start {
return Err(ParxError::InvalidPayloadBounds {
offset: manifest.page_index_offset,
length: manifest.page_index_length,
file_size: file_size as u64,
});
}
let stored_page_index_bytes = &bytes[page_index_offset..page_index_end];
let page_index_crc = crc32c::crc32c(stored_page_index_bytes);
if page_index_crc.to_le_bytes().as_slice() != manifest.page_index_checksum.as_slice() {
return Err(ParxError::PageIndexChecksumMismatch);
}
let page_indexes = if let Some(algo) = header.compression_algorithm() {
let uncompressed_size = usize::try_from(manifest.page_index_uncompressed_size)
.map_err(|_| {
ParxError::InvalidFormat(
"page index uncompressed size too large".to_string(),
)
})?;
Payload::Owned(Bytes::from(compression::decompress(
stored_page_index_bytes,
algo,
uncompressed_size,
)?))
} else {
make_payload(page_index_offset..page_index_end)
};
Some(page_indexes)
} else {
None
};
Ok(Self {
header,
manifest,
footer_bytes,
page_index_bytes,
})
}
pub fn open_bytes(bytes: &Bytes) -> Result<Self> {
Self::open_with_payload(bytes, |range| Payload::Borrowed(bytes.slice(range)))
}
#[inline]
pub const fn header(&self) -> &Header {
&self.header
}
#[inline]
pub const fn manifest(&self) -> &ParxManifest {
&self.manifest
}
#[inline]
pub fn footer_bytes(&self) -> &[u8] {
self.footer_bytes.as_slice()
}
#[inline]
pub const fn is_compressed(&self) -> bool {
self.header.is_footer_compressed()
}
#[inline]
pub const fn compression_algorithm(&self) -> Option<Compression> {
self.header.compression_algorithm()
}
#[inline]
pub const fn uncompressed_footer_size(&self) -> u64 {
self.manifest.footer_uncompressed_size
}
#[inline]
pub const fn has_page_indexes(&self) -> bool {
self.page_index_bytes.is_some()
}
#[inline]
pub fn page_index_bytes(&self) -> Option<&[u8]> {
self.page_index_bytes.as_ref().map(Payload::as_slice)
}
#[inline]
pub const fn uncompressed_page_index_size(&self) -> u64 {
self.manifest.page_index_uncompressed_size
}
#[inline]
pub const fn validate_source_size(&self, source_size: u64) -> bool {
self.manifest.source_size == source_size
}
pub fn validate_source_footer(&self, original_footer: &[u8]) -> bool {
if self.manifest.source_footer_checksum.len() != 4 {
return false; }
let footer_crc32c = crc32c::crc32c(original_footer);
footer_crc32c.to_le_bytes().as_slice() == self.manifest.source_footer_checksum.as_slice()
}
#[inline]
pub fn source_uri(&self) -> &str {
&self.manifest.source_uri
}
#[inline]
pub const fn source_size(&self) -> u64 {
self.manifest.source_size
}
#[inline]
pub const fn created_at_ms(&self) -> u64 {
self.manifest.created_at_ms
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::writer::ParxWriter;
#[test]
fn test_roundtrip() {
let footer_bytes = b"fake parquet footer data for testing";
let source_size = 1024 * 1024;
let mut writer = ParxWriter::new();
writer.set_source_uri("s3://bucket/table/part-0000.parquet");
writer.set_source_size(source_size);
writer.set_footer(footer_bytes);
let parx_bytes = writer.finish();
let reader = ParxReader::open(&parx_bytes).expect("failed to open PARX");
assert_eq!(reader.footer_bytes(), footer_bytes);
assert_eq!(reader.source_size(), source_size);
assert!(reader.validate_source_size(source_size));
assert!(!reader.validate_source_size(source_size + 1));
assert_eq!(reader.source_uri(), "s3://bucket/table/part-0000.parquet");
assert!(!reader.is_compressed());
}
#[test]
fn test_open_bytes() {
let footer_bytes = b"test footer";
let source_size = 500;
let mut writer = ParxWriter::new();
writer.set_source_size(source_size);
writer.set_footer(footer_bytes);
let parx_bytes = Bytes::from(writer.finish());
let reader = ParxReader::open_bytes(&parx_bytes).expect("failed to open PARX");
assert_eq!(reader.footer_bytes(), footer_bytes);
let footer_offset = HEADER_SIZE;
assert_eq!(
reader.footer_bytes().as_ptr(),
parx_bytes[footer_offset..footer_offset + footer_bytes.len()].as_ptr()
);
}
#[test]
fn test_invalid_checksum_length() {
let mut writer = ParxWriter::new();
writer.set_footer(b"test footer");
writer.set_source_size(1000);
let parx_bytes = writer.finish();
let reader = ParxReader::open(&parx_bytes).unwrap();
assert!(reader.validate_source_footer(b"test footer"));
}
#[test]
fn test_source_footer_validation() {
let mut writer = ParxWriter::new();
let footer = b"test footer bytes";
writer.set_footer(footer);
writer.set_source_size(1000);
let parx_bytes = writer.finish();
let reader = ParxReader::open(&parx_bytes).unwrap();
assert!(reader.validate_source_footer(footer));
assert!(!reader.validate_source_footer(b"wrong footer"));
assert!(!reader.validate_source_footer(b""));
}
#[test]
fn test_roundtrip_with_compression() {
let footer_bytes = b"test footer data for compression".repeat(100);
let source_size = 1000;
for algo in [Compression::Zstd, Compression::Lz4, Compression::Gzip] {
let mut writer = ParxWriter::new();
writer.set_source_size(source_size);
writer.set_footer(&footer_bytes);
writer.set_compression(algo);
let parx_bytes = writer.finish();
let reader = ParxReader::open(&parx_bytes).expect("failed to open PARX");
assert_eq!(reader.footer_bytes(), footer_bytes.as_slice());
assert!(reader.is_compressed());
assert_eq!(reader.compression_algorithm(), Some(algo));
assert_eq!(reader.uncompressed_footer_size(), footer_bytes.len() as u64);
}
}
#[test]
fn test_open_bytes_with_page_indexes_borrows_uncompressed_payloads() {
let mut writer = ParxWriter::new();
writer.set_source_size(1000);
writer.set_footer(b"footer");
writer.set_page_indexes(b"page-index");
let parx_bytes = Bytes::from(writer.finish());
let reader = ParxReader::open_bytes(&parx_bytes).expect("failed to open PARX");
assert_eq!(reader.footer_bytes(), b"footer");
assert_eq!(reader.page_index_bytes(), Some(b"page-index".as_slice()));
assert_eq!(
reader.footer_bytes().as_ptr(),
parx_bytes[HEADER_SIZE..HEADER_SIZE + b"footer".len()].as_ptr()
);
}
}