Skip to main content

parx_rs/
reader.rs

1/*
2 * Copyright 2026 PARX Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16//! PARX file reader.
17
18use crate::compression;
19use crate::error::{ParxError, Result};
20use crate::format::{
21    Compression, Header, Trailer, HEADER_SIZE, MAGIC, MIN_FILE_SIZE, TRAILER_SIZE,
22};
23use crate::proto::ParxManifest;
24use bytes::Bytes;
25use prost::Message;
26use std::ops::Range;
27
28#[derive(Debug, Clone)]
29enum Payload {
30    Borrowed(Bytes),
31    Owned(Bytes),
32}
33
34impl Payload {
35    fn as_slice(&self) -> &[u8] {
36        match self {
37            Self::Borrowed(bytes) | Self::Owned(bytes) => bytes.as_ref(),
38        }
39    }
40}
41
42/// Reader for PARX sidecar files.
43///
44/// Parses and validates PARX files, providing access to the cached Parquet footer bytes
45/// and optional page index data.
46#[derive(Debug, Clone)]
47pub struct ParxReader {
48    header: Header,
49    manifest: ParxManifest,
50    footer_bytes: Payload,
51    page_index_bytes: Option<Payload>,
52}
53
54impl ParxReader {
55    /// Open a PARX file from bytes.
56    ///
57    /// This validates the file structure, checksums, and extracts the manifest,
58    /// footer payload, and optional page index data.
59    ///
60    /// # Errors
61    /// Returns error if file is invalid or corrupted.
62    ///
63    /// # Panics
64    /// Panics if file size is less than minimum header size.
65    pub fn open(bytes: &[u8]) -> Result<Self> {
66        Self::open_with_payload(bytes, |range| {
67            Payload::Owned(Bytes::copy_from_slice(&bytes[range]))
68        })
69    }
70
71    fn open_with_payload<F>(bytes: &[u8], make_payload: F) -> Result<Self>
72    where
73        F: Fn(Range<usize>) -> Payload,
74    {
75        let file_size = bytes.len();
76
77        // Check minimum size
78        if file_size < MIN_FILE_SIZE {
79            return Err(ParxError::FileTooSmall {
80                size: file_size,
81                minimum: MIN_FILE_SIZE,
82            });
83        }
84
85        // Parse header
86        let header_bytes: [u8; HEADER_SIZE] = bytes[..HEADER_SIZE]
87            .try_into()
88            .expect("header slice length verified above");
89        let header = Header::from_bytes(&header_bytes);
90
91        // Validate header magic
92        if !header.is_magic_valid(MAGIC) {
93            return Err(ParxError::InvalidMagic(header.magic));
94        }
95
96        // Validate version
97        if !header.is_version_supported() {
98            return Err(ParxError::UnsupportedVersion {
99                major: header.version_major,
100                minor: header.version_minor,
101            });
102        }
103
104        // Parse trailer
105        let trailer_bytes: [u8; TRAILER_SIZE] = bytes[file_size - TRAILER_SIZE..]
106            .try_into()
107            .expect("trailer slice length verified above");
108        let trailer = Trailer::from_bytes(&trailer_bytes);
109
110        // Validate trailer magic
111        if !trailer.is_magic_valid(MAGIC) {
112            return Err(ParxError::InvalidMagic(trailer.magic));
113        }
114
115        // Extract and validate manifest
116        let manifest_end = file_size - TRAILER_SIZE;
117        let manifest_start = manifest_end
118            .checked_sub(trailer.manifest_len as usize)
119            .ok_or(ParxError::FileTooSmall {
120                size: file_size,
121                minimum: MIN_FILE_SIZE + trailer.manifest_len as usize,
122            })?;
123
124        let manifest_bytes = &bytes[manifest_start..manifest_end];
125
126        // Verify manifest CRC32C
127        let actual_crc = crc32c::crc32c(manifest_bytes);
128        if actual_crc != trailer.manifest_crc32c {
129            return Err(ParxError::ManifestChecksumMismatch {
130                expected: trailer.manifest_crc32c,
131                actual: actual_crc,
132            });
133        }
134
135        // Decode manifest
136        let manifest = ParxManifest::decode(manifest_bytes)?;
137
138        // Extract footer bytes
139        let footer_offset = usize::try_from(manifest.footer_offset).map_err(|_| {
140            ParxError::InvalidPayloadBounds {
141                offset: manifest.footer_offset,
142                length: manifest.footer_length,
143                file_size: file_size as u64,
144            }
145        })?;
146        let footer_length = usize::try_from(manifest.footer_length).map_err(|_| {
147            ParxError::InvalidPayloadBounds {
148                offset: manifest.footer_offset,
149                length: manifest.footer_length,
150                file_size: file_size as u64,
151            }
152        })?;
153        let footer_end =
154            footer_offset
155                .checked_add(footer_length)
156                .ok_or(ParxError::InvalidPayloadBounds {
157                    offset: manifest.footer_offset,
158                    length: manifest.footer_length,
159                    file_size: file_size as u64,
160                })?;
161
162        // Validate footer bounds
163        if footer_offset < HEADER_SIZE || footer_end > manifest_start {
164            return Err(ParxError::InvalidPayloadBounds {
165                offset: manifest.footer_offset,
166                length: manifest.footer_length,
167                file_size: file_size as u64,
168            });
169        }
170
171        let stored_footer_bytes = &bytes[footer_offset..footer_end];
172
173        // Verify footer checksum (on stored/compressed bytes)
174        let footer_crc = crc32c::crc32c(stored_footer_bytes);
175        if footer_crc.to_le_bytes().as_slice() != manifest.footer_checksum.as_slice() {
176            return Err(ParxError::FooterChecksumMismatch);
177        }
178
179        // Decompress footer if needed
180        let footer_bytes = if let Some(algo) = header.compression_algorithm() {
181            let uncompressed_size =
182                usize::try_from(manifest.footer_uncompressed_size).map_err(|_| {
183                    ParxError::InvalidFormat("footer uncompressed size too large".to_string())
184                })?;
185            Payload::Owned(Bytes::from(compression::decompress(
186                stored_footer_bytes,
187                algo,
188                uncompressed_size,
189            )?))
190        } else {
191            make_payload(footer_offset..footer_end)
192        };
193
194        // Extract page index bytes if present
195        let page_index_bytes = if manifest.page_index_length > 0 {
196            let page_index_offset = usize::try_from(manifest.page_index_offset).map_err(|_| {
197                ParxError::InvalidPayloadBounds {
198                    offset: manifest.page_index_offset,
199                    length: manifest.page_index_length,
200                    file_size: file_size as u64,
201                }
202            })?;
203            let page_index_length = usize::try_from(manifest.page_index_length).map_err(|_| {
204                ParxError::InvalidPayloadBounds {
205                    offset: manifest.page_index_offset,
206                    length: manifest.page_index_length,
207                    file_size: file_size as u64,
208                }
209            })?;
210            let page_index_end = page_index_offset.checked_add(page_index_length).ok_or(
211                ParxError::InvalidPayloadBounds {
212                    offset: manifest.page_index_offset,
213                    length: manifest.page_index_length,
214                    file_size: file_size as u64,
215                },
216            )?;
217
218            // Validate page index bounds
219            if page_index_offset < footer_end || page_index_end > manifest_start {
220                return Err(ParxError::InvalidPayloadBounds {
221                    offset: manifest.page_index_offset,
222                    length: manifest.page_index_length,
223                    file_size: file_size as u64,
224                });
225            }
226
227            let stored_page_index_bytes = &bytes[page_index_offset..page_index_end];
228
229            // Verify page index checksum
230            let page_index_crc = crc32c::crc32c(stored_page_index_bytes);
231            if page_index_crc.to_le_bytes().as_slice() != manifest.page_index_checksum.as_slice() {
232                return Err(ParxError::PageIndexChecksumMismatch);
233            }
234
235            // Decompress page indexes if needed (same compression as footer)
236            let page_indexes = if let Some(algo) = header.compression_algorithm() {
237                let uncompressed_size = usize::try_from(manifest.page_index_uncompressed_size)
238                    .map_err(|_| {
239                        ParxError::InvalidFormat(
240                            "page index uncompressed size too large".to_string(),
241                        )
242                    })?;
243                Payload::Owned(Bytes::from(compression::decompress(
244                    stored_page_index_bytes,
245                    algo,
246                    uncompressed_size,
247                )?))
248            } else {
249                make_payload(page_index_offset..page_index_end)
250            };
251
252            Some(page_indexes)
253        } else {
254            None
255        };
256
257        Ok(Self {
258            header,
259            manifest,
260            footer_bytes,
261            page_index_bytes,
262        })
263    }
264
265    /// Open a PARX file from owned Bytes (zero-copy where possible).
266    /// Use this when you already have `Bytes` (e.g., from object_store) to potentially reduce allocations.
267    ///
268    /// # Errors
269    /// Returns error if file is invalid or corrupted.
270    pub fn open_bytes(bytes: &Bytes) -> Result<Self> {
271        Self::open_with_payload(bytes, |range| Payload::Borrowed(bytes.slice(range)))
272    }
273
274    /// Get the parsed header.
275    #[inline]
276    pub const fn header(&self) -> &Header {
277        &self.header
278    }
279
280    /// Get the parsed manifest.
281    #[inline]
282    pub const fn manifest(&self) -> &ParxManifest {
283        &self.manifest
284    }
285
286    /// Get the cached Parquet footer bytes (decompressed if necessary).
287    #[inline]
288    pub fn footer_bytes(&self) -> &[u8] {
289        self.footer_bytes.as_slice()
290    }
291
292    /// Check if the footer was stored compressed.
293    #[inline]
294    pub const fn is_compressed(&self) -> bool {
295        self.header.is_footer_compressed()
296    }
297
298    /// Get the compression algorithm used, if any.
299    #[inline]
300    pub const fn compression_algorithm(&self) -> Option<Compression> {
301        self.header.compression_algorithm()
302    }
303
304    /// Get the original uncompressed footer size.
305    ///
306    /// Returns 0 if footer was not compressed.
307    #[inline]
308    pub const fn uncompressed_footer_size(&self) -> u64 {
309        self.manifest.footer_uncompressed_size
310    }
311
312    // === Page Index Methods ===
313
314    /// Check if this PARX file contains page index data.
315    #[inline]
316    pub const fn has_page_indexes(&self) -> bool {
317        self.page_index_bytes.is_some()
318    }
319
320    /// Get raw page index payload bytes, if present (decompressed).
321    ///
322    /// Page indexes include concatenated ColumnIndex and OffsetIndex structures
323    /// from Parquet V2 format.
324    #[inline]
325    pub fn page_index_bytes(&self) -> Option<&[u8]> {
326        self.page_index_bytes.as_ref().map(Payload::as_slice)
327    }
328
329    /// Get the original uncompressed page index size.
330    ///
331    /// Returns 0 if page indexes were not compressed.
332    #[inline]
333    pub const fn uncompressed_page_index_size(&self) -> u64 {
334        self.manifest.page_index_uncompressed_size
335    }
336
337    // === Validation Methods ===
338
339    /// Validate that this PARX file matches the given Parquet file size.
340    ///
341    /// This is the fast validation check that should always be performed
342    /// before using cached footer bytes.
343    #[inline]
344    pub const fn validate_source_size(&self, source_size: u64) -> bool {
345        self.manifest.source_size == source_size
346    }
347
348    /// Validate that this PARX file matches the given Parquet footer hash.
349    ///
350    /// This is the paranoid validation check that re-hashes the original
351    /// Parquet footer bytes. Only use when you have the original footer.
352    pub fn validate_source_footer(&self, original_footer: &[u8]) -> bool {
353        if self.manifest.source_footer_checksum.len() != 4 {
354            return false; // Wrong checksum length
355        }
356        let footer_crc32c = crc32c::crc32c(original_footer);
357        footer_crc32c.to_le_bytes().as_slice() == self.manifest.source_footer_checksum.as_slice()
358    }
359
360    /// Get the source URI from the manifest (may be empty).
361    #[inline]
362    pub fn source_uri(&self) -> &str {
363        &self.manifest.source_uri
364    }
365
366    /// Get the source file size from the manifest.
367    #[inline]
368    pub const fn source_size(&self) -> u64 {
369        self.manifest.source_size
370    }
371
372    /// Get the creation timestamp in milliseconds.
373    #[inline]
374    pub const fn created_at_ms(&self) -> u64 {
375        self.manifest.created_at_ms
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use crate::writer::ParxWriter;
383
384    #[test]
385    fn test_roundtrip() {
386        let footer_bytes = b"fake parquet footer data for testing";
387        let source_size = 1024 * 1024; // 1 MB
388
389        let mut writer = ParxWriter::new();
390        writer.set_source_uri("s3://bucket/table/part-0000.parquet");
391        writer.set_source_size(source_size);
392        writer.set_footer(footer_bytes);
393
394        let parx_bytes = writer.finish();
395
396        let reader = ParxReader::open(&parx_bytes).expect("failed to open PARX");
397
398        assert_eq!(reader.footer_bytes(), footer_bytes);
399        assert_eq!(reader.source_size(), source_size);
400        assert!(reader.validate_source_size(source_size));
401        assert!(!reader.validate_source_size(source_size + 1));
402        assert_eq!(reader.source_uri(), "s3://bucket/table/part-0000.parquet");
403        assert!(!reader.is_compressed());
404    }
405
406    #[test]
407    fn test_open_bytes() {
408        let footer_bytes = b"test footer";
409        let source_size = 500;
410
411        let mut writer = ParxWriter::new();
412        writer.set_source_size(source_size);
413        writer.set_footer(footer_bytes);
414
415        let parx_bytes = Bytes::from(writer.finish());
416        let reader = ParxReader::open_bytes(&parx_bytes).expect("failed to open PARX");
417
418        assert_eq!(reader.footer_bytes(), footer_bytes);
419        let footer_offset = HEADER_SIZE;
420        assert_eq!(
421            reader.footer_bytes().as_ptr(),
422            parx_bytes[footer_offset..footer_offset + footer_bytes.len()].as_ptr()
423        );
424    }
425
426    #[test]
427    fn test_invalid_checksum_length() {
428        // Create a valid PARX file
429        let mut writer = ParxWriter::new();
430        writer.set_footer(b"test footer");
431        writer.set_source_size(1000);
432        let parx_bytes = writer.finish();
433
434        // Manually corrupt the manifest to have wrong checksum length
435        // (This requires parsing and re-encoding the manifest)
436        // For now, let's test that validate_source_footer handles it gracefully
437
438        let reader = ParxReader::open(&parx_bytes).unwrap();
439
440        // Valid checksum (4 bytes)
441        assert!(reader.validate_source_footer(b"test footer"));
442    }
443
444    #[test]
445    fn test_source_footer_validation() {
446        let mut writer = ParxWriter::new();
447        let footer = b"test footer bytes";
448        writer.set_footer(footer);
449        writer.set_source_size(1000);
450        let parx_bytes = writer.finish();
451
452        let reader = ParxReader::open(&parx_bytes).unwrap();
453
454        // Correct footer validates
455        assert!(reader.validate_source_footer(footer));
456
457        // Wrong footer fails
458        assert!(!reader.validate_source_footer(b"wrong footer"));
459
460        // Empty footer fails
461        assert!(!reader.validate_source_footer(b""));
462    }
463
464    #[test]
465    fn test_roundtrip_with_compression() {
466        let footer_bytes = b"test footer data for compression".repeat(100);
467        let source_size = 1000;
468
469        for algo in [Compression::Zstd, Compression::Lz4, Compression::Gzip] {
470            let mut writer = ParxWriter::new();
471            writer.set_source_size(source_size);
472            writer.set_footer(&footer_bytes);
473            writer.set_compression(algo);
474
475            let parx_bytes = writer.finish();
476            let reader = ParxReader::open(&parx_bytes).expect("failed to open PARX");
477
478            assert_eq!(reader.footer_bytes(), footer_bytes.as_slice());
479            assert!(reader.is_compressed());
480            assert_eq!(reader.compression_algorithm(), Some(algo));
481            assert_eq!(reader.uncompressed_footer_size(), footer_bytes.len() as u64);
482        }
483    }
484
485    #[test]
486    fn test_open_bytes_with_page_indexes_borrows_uncompressed_payloads() {
487        let mut writer = ParxWriter::new();
488        writer.set_source_size(1000);
489        writer.set_footer(b"footer");
490        writer.set_page_indexes(b"page-index");
491
492        let parx_bytes = Bytes::from(writer.finish());
493        let reader = ParxReader::open_bytes(&parx_bytes).expect("failed to open PARX");
494
495        assert_eq!(reader.footer_bytes(), b"footer");
496        assert_eq!(reader.page_index_bytes(), Some(b"page-index".as_slice()));
497        assert_eq!(
498            reader.footer_bytes().as_ptr(),
499            parx_bytes[HEADER_SIZE..HEADER_SIZE + b"footer".len()].as_ptr()
500        );
501    }
502}