1use crate::compression;
19use crate::error::{ParxError, Result};
20use crate::format::{
21 Compression, Header, Trailer, HEADER_SIZE, MAGIC, MIN_FILE_SIZE, TRAILER_SIZE,
22};
23use crate::proto::ParxManifest;
24use bytes::Bytes;
25use prost::Message;
26use std::ops::Range;
27
28#[derive(Debug, Clone)]
29enum Payload {
30 Borrowed(Bytes),
31 Owned(Bytes),
32}
33
34impl Payload {
35 fn as_slice(&self) -> &[u8] {
36 match self {
37 Self::Borrowed(bytes) | Self::Owned(bytes) => bytes.as_ref(),
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
47pub struct ParxReader {
48 header: Header,
49 manifest: ParxManifest,
50 footer_bytes: Payload,
51 page_index_bytes: Option<Payload>,
52}
53
54impl ParxReader {
55 pub fn open(bytes: &[u8]) -> Result<Self> {
66 Self::open_with_payload(bytes, |range| {
67 Payload::Owned(Bytes::copy_from_slice(&bytes[range]))
68 })
69 }
70
71 fn open_with_payload<F>(bytes: &[u8], make_payload: F) -> Result<Self>
72 where
73 F: Fn(Range<usize>) -> Payload,
74 {
75 let file_size = bytes.len();
76
77 if file_size < MIN_FILE_SIZE {
79 return Err(ParxError::FileTooSmall {
80 size: file_size,
81 minimum: MIN_FILE_SIZE,
82 });
83 }
84
85 let header_bytes: [u8; HEADER_SIZE] = bytes[..HEADER_SIZE]
87 .try_into()
88 .expect("header slice length verified above");
89 let header = Header::from_bytes(&header_bytes);
90
91 if !header.is_magic_valid(MAGIC) {
93 return Err(ParxError::InvalidMagic(header.magic));
94 }
95
96 if !header.is_version_supported() {
98 return Err(ParxError::UnsupportedVersion {
99 major: header.version_major,
100 minor: header.version_minor,
101 });
102 }
103
104 let trailer_bytes: [u8; TRAILER_SIZE] = bytes[file_size - TRAILER_SIZE..]
106 .try_into()
107 .expect("trailer slice length verified above");
108 let trailer = Trailer::from_bytes(&trailer_bytes);
109
110 if !trailer.is_magic_valid(MAGIC) {
112 return Err(ParxError::InvalidMagic(trailer.magic));
113 }
114
115 let manifest_end = file_size - TRAILER_SIZE;
117 let manifest_start = manifest_end
118 .checked_sub(trailer.manifest_len as usize)
119 .ok_or(ParxError::FileTooSmall {
120 size: file_size,
121 minimum: MIN_FILE_SIZE + trailer.manifest_len as usize,
122 })?;
123
124 let manifest_bytes = &bytes[manifest_start..manifest_end];
125
126 let actual_crc = crc32c::crc32c(manifest_bytes);
128 if actual_crc != trailer.manifest_crc32c {
129 return Err(ParxError::ManifestChecksumMismatch {
130 expected: trailer.manifest_crc32c,
131 actual: actual_crc,
132 });
133 }
134
135 let manifest = ParxManifest::decode(manifest_bytes)?;
137
138 let footer_offset = usize::try_from(manifest.footer_offset).map_err(|_| {
140 ParxError::InvalidPayloadBounds {
141 offset: manifest.footer_offset,
142 length: manifest.footer_length,
143 file_size: file_size as u64,
144 }
145 })?;
146 let footer_length = usize::try_from(manifest.footer_length).map_err(|_| {
147 ParxError::InvalidPayloadBounds {
148 offset: manifest.footer_offset,
149 length: manifest.footer_length,
150 file_size: file_size as u64,
151 }
152 })?;
153 let footer_end =
154 footer_offset
155 .checked_add(footer_length)
156 .ok_or(ParxError::InvalidPayloadBounds {
157 offset: manifest.footer_offset,
158 length: manifest.footer_length,
159 file_size: file_size as u64,
160 })?;
161
162 if footer_offset < HEADER_SIZE || footer_end > manifest_start {
164 return Err(ParxError::InvalidPayloadBounds {
165 offset: manifest.footer_offset,
166 length: manifest.footer_length,
167 file_size: file_size as u64,
168 });
169 }
170
171 let stored_footer_bytes = &bytes[footer_offset..footer_end];
172
173 let footer_crc = crc32c::crc32c(stored_footer_bytes);
175 if footer_crc.to_le_bytes().as_slice() != manifest.footer_checksum.as_slice() {
176 return Err(ParxError::FooterChecksumMismatch);
177 }
178
179 let footer_bytes = if let Some(algo) = header.compression_algorithm() {
181 let uncompressed_size =
182 usize::try_from(manifest.footer_uncompressed_size).map_err(|_| {
183 ParxError::InvalidFormat("footer uncompressed size too large".to_string())
184 })?;
185 Payload::Owned(Bytes::from(compression::decompress(
186 stored_footer_bytes,
187 algo,
188 uncompressed_size,
189 )?))
190 } else {
191 make_payload(footer_offset..footer_end)
192 };
193
194 let page_index_bytes = if manifest.page_index_length > 0 {
196 let page_index_offset = usize::try_from(manifest.page_index_offset).map_err(|_| {
197 ParxError::InvalidPayloadBounds {
198 offset: manifest.page_index_offset,
199 length: manifest.page_index_length,
200 file_size: file_size as u64,
201 }
202 })?;
203 let page_index_length = usize::try_from(manifest.page_index_length).map_err(|_| {
204 ParxError::InvalidPayloadBounds {
205 offset: manifest.page_index_offset,
206 length: manifest.page_index_length,
207 file_size: file_size as u64,
208 }
209 })?;
210 let page_index_end = page_index_offset.checked_add(page_index_length).ok_or(
211 ParxError::InvalidPayloadBounds {
212 offset: manifest.page_index_offset,
213 length: manifest.page_index_length,
214 file_size: file_size as u64,
215 },
216 )?;
217
218 if page_index_offset < footer_end || page_index_end > manifest_start {
220 return Err(ParxError::InvalidPayloadBounds {
221 offset: manifest.page_index_offset,
222 length: manifest.page_index_length,
223 file_size: file_size as u64,
224 });
225 }
226
227 let stored_page_index_bytes = &bytes[page_index_offset..page_index_end];
228
229 let page_index_crc = crc32c::crc32c(stored_page_index_bytes);
231 if page_index_crc.to_le_bytes().as_slice() != manifest.page_index_checksum.as_slice() {
232 return Err(ParxError::PageIndexChecksumMismatch);
233 }
234
235 let page_indexes = if let Some(algo) = header.compression_algorithm() {
237 let uncompressed_size = usize::try_from(manifest.page_index_uncompressed_size)
238 .map_err(|_| {
239 ParxError::InvalidFormat(
240 "page index uncompressed size too large".to_string(),
241 )
242 })?;
243 Payload::Owned(Bytes::from(compression::decompress(
244 stored_page_index_bytes,
245 algo,
246 uncompressed_size,
247 )?))
248 } else {
249 make_payload(page_index_offset..page_index_end)
250 };
251
252 Some(page_indexes)
253 } else {
254 None
255 };
256
257 Ok(Self {
258 header,
259 manifest,
260 footer_bytes,
261 page_index_bytes,
262 })
263 }
264
265 pub fn open_bytes(bytes: &Bytes) -> Result<Self> {
271 Self::open_with_payload(bytes, |range| Payload::Borrowed(bytes.slice(range)))
272 }
273
274 #[inline]
276 pub const fn header(&self) -> &Header {
277 &self.header
278 }
279
280 #[inline]
282 pub const fn manifest(&self) -> &ParxManifest {
283 &self.manifest
284 }
285
286 #[inline]
288 pub fn footer_bytes(&self) -> &[u8] {
289 self.footer_bytes.as_slice()
290 }
291
292 #[inline]
294 pub const fn is_compressed(&self) -> bool {
295 self.header.is_footer_compressed()
296 }
297
298 #[inline]
300 pub const fn compression_algorithm(&self) -> Option<Compression> {
301 self.header.compression_algorithm()
302 }
303
304 #[inline]
308 pub const fn uncompressed_footer_size(&self) -> u64 {
309 self.manifest.footer_uncompressed_size
310 }
311
312 #[inline]
316 pub const fn has_page_indexes(&self) -> bool {
317 self.page_index_bytes.is_some()
318 }
319
320 #[inline]
325 pub fn page_index_bytes(&self) -> Option<&[u8]> {
326 self.page_index_bytes.as_ref().map(Payload::as_slice)
327 }
328
329 #[inline]
333 pub const fn uncompressed_page_index_size(&self) -> u64 {
334 self.manifest.page_index_uncompressed_size
335 }
336
337 #[inline]
344 pub const fn validate_source_size(&self, source_size: u64) -> bool {
345 self.manifest.source_size == source_size
346 }
347
348 pub fn validate_source_footer(&self, original_footer: &[u8]) -> bool {
353 if self.manifest.source_footer_checksum.len() != 4 {
354 return false; }
356 let footer_crc32c = crc32c::crc32c(original_footer);
357 footer_crc32c.to_le_bytes().as_slice() == self.manifest.source_footer_checksum.as_slice()
358 }
359
360 #[inline]
362 pub fn source_uri(&self) -> &str {
363 &self.manifest.source_uri
364 }
365
366 #[inline]
368 pub const fn source_size(&self) -> u64 {
369 self.manifest.source_size
370 }
371
372 #[inline]
374 pub const fn created_at_ms(&self) -> u64 {
375 self.manifest.created_at_ms
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use crate::writer::ParxWriter;
383
384 #[test]
385 fn test_roundtrip() {
386 let footer_bytes = b"fake parquet footer data for testing";
387 let source_size = 1024 * 1024; let mut writer = ParxWriter::new();
390 writer.set_source_uri("s3://bucket/table/part-0000.parquet");
391 writer.set_source_size(source_size);
392 writer.set_footer(footer_bytes);
393
394 let parx_bytes = writer.finish();
395
396 let reader = ParxReader::open(&parx_bytes).expect("failed to open PARX");
397
398 assert_eq!(reader.footer_bytes(), footer_bytes);
399 assert_eq!(reader.source_size(), source_size);
400 assert!(reader.validate_source_size(source_size));
401 assert!(!reader.validate_source_size(source_size + 1));
402 assert_eq!(reader.source_uri(), "s3://bucket/table/part-0000.parquet");
403 assert!(!reader.is_compressed());
404 }
405
406 #[test]
407 fn test_open_bytes() {
408 let footer_bytes = b"test footer";
409 let source_size = 500;
410
411 let mut writer = ParxWriter::new();
412 writer.set_source_size(source_size);
413 writer.set_footer(footer_bytes);
414
415 let parx_bytes = Bytes::from(writer.finish());
416 let reader = ParxReader::open_bytes(&parx_bytes).expect("failed to open PARX");
417
418 assert_eq!(reader.footer_bytes(), footer_bytes);
419 let footer_offset = HEADER_SIZE;
420 assert_eq!(
421 reader.footer_bytes().as_ptr(),
422 parx_bytes[footer_offset..footer_offset + footer_bytes.len()].as_ptr()
423 );
424 }
425
426 #[test]
427 fn test_invalid_checksum_length() {
428 let mut writer = ParxWriter::new();
430 writer.set_footer(b"test footer");
431 writer.set_source_size(1000);
432 let parx_bytes = writer.finish();
433
434 let reader = ParxReader::open(&parx_bytes).unwrap();
439
440 assert!(reader.validate_source_footer(b"test footer"));
442 }
443
444 #[test]
445 fn test_source_footer_validation() {
446 let mut writer = ParxWriter::new();
447 let footer = b"test footer bytes";
448 writer.set_footer(footer);
449 writer.set_source_size(1000);
450 let parx_bytes = writer.finish();
451
452 let reader = ParxReader::open(&parx_bytes).unwrap();
453
454 assert!(reader.validate_source_footer(footer));
456
457 assert!(!reader.validate_source_footer(b"wrong footer"));
459
460 assert!(!reader.validate_source_footer(b""));
462 }
463
464 #[test]
465 fn test_roundtrip_with_compression() {
466 let footer_bytes = b"test footer data for compression".repeat(100);
467 let source_size = 1000;
468
469 for algo in [Compression::Zstd, Compression::Lz4, Compression::Gzip] {
470 let mut writer = ParxWriter::new();
471 writer.set_source_size(source_size);
472 writer.set_footer(&footer_bytes);
473 writer.set_compression(algo);
474
475 let parx_bytes = writer.finish();
476 let reader = ParxReader::open(&parx_bytes).expect("failed to open PARX");
477
478 assert_eq!(reader.footer_bytes(), footer_bytes.as_slice());
479 assert!(reader.is_compressed());
480 assert_eq!(reader.compression_algorithm(), Some(algo));
481 assert_eq!(reader.uncompressed_footer_size(), footer_bytes.len() as u64);
482 }
483 }
484
485 #[test]
486 fn test_open_bytes_with_page_indexes_borrows_uncompressed_payloads() {
487 let mut writer = ParxWriter::new();
488 writer.set_source_size(1000);
489 writer.set_footer(b"footer");
490 writer.set_page_indexes(b"page-index");
491
492 let parx_bytes = Bytes::from(writer.finish());
493 let reader = ParxReader::open_bytes(&parx_bytes).expect("failed to open PARX");
494
495 assert_eq!(reader.footer_bytes(), b"footer");
496 assert_eq!(reader.page_index_bytes(), Some(b"page-index".as_slice()));
497 assert_eq!(
498 reader.footer_bytes().as_ptr(),
499 parx_bytes[HEADER_SIZE..HEADER_SIZE + b"footer".len()].as_ptr()
500 );
501 }
502}