Skip to main content

parx_rs/
bundle.rs

1/*
2 * Copyright 2026 PARX Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use crate::error::{ParxError, Result};
18use crate::format::{BundleHeader, Trailer, BUNDLE_HEADER_SIZE, BUNDLE_MAGIC, TRAILER_SIZE};
19use crate::proto::{BundleEntry, ParxBundle};
20use bytes::Bytes;
21use prost::Message;
22use std::collections::HashMap;
23use std::time::{SystemTime, UNIX_EPOCH};
24
25/// Default bundle filename.
26pub const BUNDLE_FILENAME: &str = "_parx_bundle.parx";
27
28/// Data for a single entry in the bundle.
29#[derive(Debug, Clone)]
30pub struct BundleEntryData {
31    /// Relative path to the Parquet file.
32    pub parquet_path: String,
33    /// Source file size for staleness detection.
34    pub source_size: u64,
35    /// Raw footer bytes.
36    pub footer_bytes: Bytes,
37    /// Optional raw page index bytes.
38    pub page_index_bytes: Bytes,
39}
40
41/// Writer for PARX bundle files.
42///
43/// Combines multiple Parquet file footers into a single bundle file.
44#[derive(Debug)]
45pub struct ParxBundleWriter {
46    entries: Vec<BundleEntryData>,
47}
48
49impl ParxBundleWriter {
50    /// Create a new bundle writer.
51    pub const fn new() -> Self {
52        Self {
53            entries: Vec::new(),
54        }
55    }
56
57    /// Add an entry to the bundle.
58    pub fn add_entry(
59        &mut self,
60        parquet_path: &str,
61        source_size: u64,
62        footer_bytes: impl Into<Bytes>,
63    ) {
64        self.add_entry_with_page_indexes(parquet_path, source_size, footer_bytes, Bytes::new());
65    }
66
67    /// Add an entry to the bundle with optional page indexes.
68    pub fn add_entry_with_page_indexes(
69        &mut self,
70        parquet_path: &str,
71        source_size: u64,
72        footer_bytes: impl Into<Bytes>,
73        page_index_bytes: impl Into<Bytes>,
74    ) {
75        self.entries.push(BundleEntryData {
76            parquet_path: parquet_path.to_string(),
77            source_size,
78            footer_bytes: footer_bytes.into(),
79            page_index_bytes: page_index_bytes.into(),
80        });
81    }
82
83    /// Get the number of entries in the bundle.
84    #[inline]
85    pub fn entry_count(&self) -> usize {
86        self.entries.len()
87    }
88
89    /// Build the bundle file bytes
90    ///
91    /// # Panics
92    /// Panics if manifest exceeds 4GB.
93    pub fn finish(self) -> Vec<u8> {
94        let header = BundleHeader::new(self.entries.len() as u64);
95        let header_bytes = header.to_bytes();
96
97        let mut current_offset = BUNDLE_HEADER_SIZE as u64;
98        let mut payload = Vec::new();
99        let mut bundle_entries = Vec::new();
100
101        for entry in &self.entries {
102            // Footer section
103            let footer_offset = current_offset;
104            let footer_length = entry.footer_bytes.len() as u64;
105            let footer_checksum = crc32c::crc32c(&entry.footer_bytes).to_le_bytes().to_vec();
106
107            payload.extend_from_slice(&entry.footer_bytes);
108            current_offset += footer_length;
109
110            // Optional page index section
111            let (page_index_offset, page_index_length, page_index_checksum) =
112                if entry.page_index_bytes.is_empty() {
113                    (0, 0, Vec::new())
114                } else {
115                    let page_index_offset = current_offset;
116                    let page_index_length = entry.page_index_bytes.len() as u64;
117                    let page_index_checksum = crc32c::crc32c(&entry.page_index_bytes)
118                        .to_le_bytes()
119                        .to_vec();
120                    payload.extend_from_slice(&entry.page_index_bytes);
121                    current_offset += page_index_length;
122                    (page_index_offset, page_index_length, page_index_checksum)
123                };
124
125            bundle_entries.push(BundleEntry {
126                parquet_path: entry.parquet_path.clone(),
127                source_size: entry.source_size,
128                footer_offset,
129                footer_length,
130                footer_checksum,
131                page_index_offset,
132                page_index_length,
133                page_index_checksum,
134            });
135        }
136
137        // Get current timestamp (safe cast: won't overflow until year 2554)
138        #[allow(clippy::cast_possible_truncation)]
139        let created_at_ms = SystemTime::now()
140            .duration_since(UNIX_EPOCH)
141            .map(|d| d.as_millis() as u64)
142            .unwrap_or(0);
143
144        // Build bundle manifest
145        let bundle = ParxBundle {
146            version: 1,
147            created_at_ms,
148            entries: bundle_entries,
149        };
150
151        let manifest_bytes = bundle.encode_to_vec();
152        let manifest_crc = crc32c::crc32c(&manifest_bytes);
153
154        // Build trailer with bundle magic
155        let manifest_len = u32::try_from(manifest_bytes.len()).expect("manifest too large (>4GB)");
156        let trailer = Trailer::new(manifest_len, manifest_crc, BUNDLE_MAGIC);
157        let trailer_bytes = trailer.to_bytes();
158
159        // Assemble file
160        let total_size =
161            header_bytes.len() + payload.len() + manifest_bytes.len() + trailer_bytes.len();
162
163        let mut output = Vec::with_capacity(total_size);
164        output.extend_from_slice(&header_bytes);
165        output.extend_from_slice(&payload);
166        output.extend_from_slice(&manifest_bytes);
167        output.extend_from_slice(&trailer_bytes);
168
169        output
170    }
171}
172
173impl Default for ParxBundleWriter {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179/// Reference to a single entry in a bundle.
180#[derive(Debug, Clone)]
181pub struct BundleEntryRef<'a> {
182    /// Relative path to the Parquet file.
183    pub parquet_path: &'a str,
184    /// Source file size.
185    pub source_size: u64,
186    /// Footer bytes.
187    pub footer_bytes: &'a [u8],
188    /// Optional page index bytes.
189    pub page_index_bytes: Option<&'a [u8]>,
190}
191
192/// Reader for PARX bundle files.
193///
194/// Provides access to all cached footers in the bundle.
195#[derive(Debug, Clone)]
196pub struct ParxBundleReader {
197    header: BundleHeader,
198    bundle: ParxBundle,
199    data: Bytes,
200    /// Index for fast path lookup.
201    path_index: HashMap<String, usize>,
202}
203
204impl ParxBundleReader {
205    /// Open a bundle from bytes.
206    ///
207    /// # Errors
208    /// Returns error if file is invalid or corrupted.
209    ///
210    /// # Panics
211    /// Panics if file size is less than minimum header size.
212    pub fn open(data: Bytes) -> Result<Self> {
213        let file_size = data.len();
214        let min_size = BUNDLE_HEADER_SIZE + TRAILER_SIZE;
215
216        if file_size < min_size {
217            return Err(ParxError::FileTooSmall {
218                size: file_size,
219                minimum: min_size,
220            });
221        }
222
223        // Parse header
224        let header_bytes: [u8; BUNDLE_HEADER_SIZE] = data[..BUNDLE_HEADER_SIZE]
225            .try_into()
226            .expect("header slice length verified above");
227        let header = BundleHeader::from_bytes(&header_bytes);
228
229        // Validate header magic
230        if !header.is_magic_valid() {
231            return Err(ParxError::InvalidBundleMagic(header.magic));
232        }
233
234        // Validate version
235        if !header.is_version_supported() {
236            return Err(ParxError::UnsupportedVersion {
237                major: header.version_major,
238                minor: header.version_minor,
239            });
240        }
241
242        // Parse trailer
243        let trailer_bytes: [u8; TRAILER_SIZE] = data[file_size - TRAILER_SIZE..]
244            .try_into()
245            .expect("trailer slice length verified above");
246        let trailer = Trailer::from_bytes(&trailer_bytes);
247
248        // Validate trailer magic
249        if !trailer.is_magic_valid(BUNDLE_MAGIC) {
250            return Err(ParxError::InvalidBundleMagic(trailer.magic));
251        }
252
253        // Extract and validate manifest
254        let manifest_end = file_size - TRAILER_SIZE;
255        let manifest_start = manifest_end
256            .checked_sub(trailer.manifest_len as usize)
257            .ok_or(ParxError::FileTooSmall {
258                size: file_size,
259                minimum: min_size + trailer.manifest_len as usize,
260            })?;
261
262        let manifest_bytes = &data[manifest_start..manifest_end];
263
264        // Verify manifest CRC
265        let actual_crc = crc32c::crc32c(manifest_bytes);
266        if actual_crc != trailer.manifest_crc32c {
267            return Err(ParxError::ManifestChecksumMismatch {
268                expected: trailer.manifest_crc32c,
269                actual: actual_crc,
270            });
271        }
272
273        // Decode bundle manifest
274        let bundle = ParxBundle::decode(manifest_bytes)?;
275
276        // Build path index
277        let path_index: HashMap<String, usize> = bundle
278            .entries
279            .iter()
280            .enumerate()
281            .map(|(i, e)| (e.parquet_path.clone(), i))
282            .collect();
283
284        Ok(Self {
285            header,
286            bundle,
287            data,
288            path_index,
289        })
290    }
291
292    /// Get the bundle header.
293    #[inline]
294    pub const fn header(&self) -> &BundleHeader {
295        &self.header
296    }
297
298    /// Get the number of entries in the bundle.
299    #[inline]
300    pub fn entry_count(&self) -> usize {
301        self.bundle.entries.len()
302    }
303
304    /// Get the creation timestamp in milliseconds.
305    #[inline]
306    pub const fn created_at_ms(&self) -> u64 {
307        self.bundle.created_at_ms
308    }
309
310    /// Check if a parquet path exists in the bundle.
311    #[inline]
312    pub fn contains(&self, parquet_path: &str) -> bool {
313        self.path_index.contains_key(parquet_path)
314    }
315
316    /// Get the list of parquet paths in this bundle.
317    pub fn parquet_paths(&self) -> Vec<&str> {
318        self.bundle
319            .entries
320            .iter()
321            .map(|e| e.parquet_path.as_str())
322            .collect()
323    }
324
325    /// Get footer bytes for a specific parquet file.
326    pub fn get_footer(&self, parquet_path: &str) -> Option<&[u8]> {
327        let idx = *self.path_index.get(parquet_path)?;
328        let entry = &self.bundle.entries[idx];
329
330        let start = usize::try_from(entry.footer_offset).ok()?;
331        let length = usize::try_from(entry.footer_length).ok()?;
332        let end = start
333            .checked_add(length)
334            .filter(|&end| end <= self.data.len())?;
335
336        Some(&self.data[start..end])
337    }
338
339    /// Get source size for a specific parquet file.
340    pub fn get_source_size(&self, parquet_path: &str) -> Option<u64> {
341        let idx = *self.path_index.get(parquet_path)?;
342        Some(self.bundle.entries[idx].source_size)
343    }
344
345    /// Validate that a parquet file matches the expected size.
346    pub fn validate_source_size(&self, parquet_path: &str, actual_size: u64) -> bool {
347        self.get_source_size(parquet_path)
348            .is_some_and(|expected| expected == actual_size)
349    }
350
351    /// Get a full entry reference for a parquet file.
352    pub fn get_entry(&self, parquet_path: &str) -> Option<BundleEntryRef<'_>> {
353        let idx = *self.path_index.get(parquet_path)?;
354        let entry = &self.bundle.entries[idx];
355
356        let footer_start = usize::try_from(entry.footer_offset).ok()?;
357        let footer_length = usize::try_from(entry.footer_length).ok()?;
358        let footer_end = footer_start
359            .checked_add(footer_length)
360            .filter(|&end| end <= self.data.len())?;
361
362        let footer_bytes = &self.data[footer_start..footer_end];
363        let page_index_bytes = self.resolve_page_index_bytes(entry)?;
364
365        Some(BundleEntryRef {
366            parquet_path: &entry.parquet_path,
367            source_size: entry.source_size,
368            footer_bytes,
369            page_index_bytes,
370        })
371    }
372
373    /// Iterate over all entries in the bundle.
374    pub fn iter_entries(&self) -> impl Iterator<Item = BundleEntryRef<'_>> {
375        self.bundle.entries.iter().filter_map(|entry| {
376            let footer_start = usize::try_from(entry.footer_offset).ok()?;
377            let footer_length = usize::try_from(entry.footer_length).ok()?;
378            let footer_end = footer_start
379                .checked_add(footer_length)
380                .filter(|&end| end <= self.data.len())?;
381
382            let footer_bytes = &self.data[footer_start..footer_end];
383            let page_index_bytes = self.resolve_page_index_bytes(entry)?;
384
385            Some(BundleEntryRef {
386                parquet_path: &entry.parquet_path,
387                source_size: entry.source_size,
388                footer_bytes,
389                page_index_bytes,
390            })
391        })
392    }
393
394    fn resolve_page_index_bytes<'a>(&'a self, entry: &BundleEntry) -> Option<Option<&'a [u8]>> {
395        if entry.page_index_length == 0 {
396            return Some(None);
397        }
398
399        let start = usize::try_from(entry.page_index_offset).ok()?;
400        let length = usize::try_from(entry.page_index_length).ok()?;
401        let end = start
402            .checked_add(length)
403            .filter(|&end| end <= self.data.len())?;
404        Some(Some(&self.data[start..end]))
405    }
406
407    /// Validate all entry checksums.
408    ///
409    /// # Errors
410    /// Returns error if any checksum fails or bounds are invalid.
411    pub fn validate_all(&self) -> Result<()> {
412        for entry in &self.bundle.entries {
413            // Validate footer checksum
414            let footer_start = usize::try_from(entry.footer_offset).map_err(|_| {
415                ParxError::InvalidPayloadBounds {
416                    offset: entry.footer_offset,
417                    length: entry.footer_length,
418                    file_size: self.data.len() as u64,
419                }
420            })?;
421            let footer_length = usize::try_from(entry.footer_length).map_err(|_| {
422                ParxError::InvalidPayloadBounds {
423                    offset: entry.footer_offset,
424                    length: entry.footer_length,
425                    file_size: self.data.len() as u64,
426                }
427            })?;
428            let footer_end = footer_start.checked_add(footer_length).ok_or_else(|| {
429                ParxError::InvalidPayloadBounds {
430                    offset: entry.footer_offset,
431                    length: entry.footer_length,
432                    file_size: self.data.len() as u64,
433                }
434            })?;
435
436            if footer_end > self.data.len() {
437                return Err(ParxError::InvalidPayloadBounds {
438                    offset: entry.footer_offset,
439                    length: entry.footer_length,
440                    file_size: self.data.len() as u64,
441                });
442            }
443
444            let footer_bytes = &self.data[footer_start..footer_end];
445            let footer_crc = crc32c::crc32c(footer_bytes);
446
447            if footer_crc.to_le_bytes().as_slice() != entry.footer_checksum.as_slice() {
448                return Err(ParxError::FooterChecksumMismatch);
449            }
450
451            // Validate optional page index checksum
452            if entry.page_index_length > 0 {
453                let page_index_start = usize::try_from(entry.page_index_offset).map_err(|_| {
454                    ParxError::InvalidPayloadBounds {
455                        offset: entry.page_index_offset,
456                        length: entry.page_index_length,
457                        file_size: self.data.len() as u64,
458                    }
459                })?;
460                let page_index_length = usize::try_from(entry.page_index_length).map_err(|_| {
461                    ParxError::InvalidPayloadBounds {
462                        offset: entry.page_index_offset,
463                        length: entry.page_index_length,
464                        file_size: self.data.len() as u64,
465                    }
466                })?;
467                let page_index_end =
468                    page_index_start
469                        .checked_add(page_index_length)
470                        .ok_or_else(|| ParxError::InvalidPayloadBounds {
471                            offset: entry.page_index_offset,
472                            length: entry.page_index_length,
473                            file_size: self.data.len() as u64,
474                        })?;
475
476                if page_index_end > self.data.len() {
477                    return Err(ParxError::InvalidPayloadBounds {
478                        offset: entry.page_index_offset,
479                        length: entry.page_index_length,
480                        file_size: self.data.len() as u64,
481                    });
482                }
483
484                let page_index_bytes = &self.data[page_index_start..page_index_end];
485                let page_index_crc = crc32c::crc32c(page_index_bytes);
486                if page_index_crc.to_le_bytes().as_slice() != entry.page_index_checksum.as_slice() {
487                    return Err(ParxError::PageIndexChecksumMismatch);
488                }
489            }
490        }
491
492        Ok(())
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499
500    #[test]
501    fn test_bundle_roundtrip() {
502        let mut writer = ParxBundleWriter::new();
503        writer.add_entry("part-00000.parquet", 1000, b"footer0".to_vec());
504        writer.add_entry("part-00001.parquet", 2000, b"footer1".to_vec());
505        writer.add_entry("part-00002.parquet", 3000, b"footer2".to_vec());
506
507        let bundle_bytes = writer.finish();
508        let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
509
510        assert_eq!(reader.entry_count(), 3);
511        assert!(reader.contains("part-00000.parquet"));
512        assert!(reader.contains("part-00001.parquet"));
513        assert!(reader.contains("part-00002.parquet"));
514        assert!(!reader.contains("nonexistent.parquet"));
515
516        assert_eq!(
517            reader.get_footer("part-00000.parquet"),
518            Some(b"footer0".as_slice())
519        );
520        assert_eq!(
521            reader.get_footer("part-00001.parquet"),
522            Some(b"footer1".as_slice())
523        );
524        assert_eq!(
525            reader.get_footer("part-00002.parquet"),
526            Some(b"footer2".as_slice())
527        );
528
529        assert_eq!(reader.get_source_size("part-00000.parquet"), Some(1000));
530        assert_eq!(reader.get_source_size("part-00001.parquet"), Some(2000));
531
532        assert!(reader.validate_source_size("part-00000.parquet", 1000));
533        assert!(!reader.validate_source_size("part-00000.parquet", 9999));
534    }
535
536    #[test]
537    fn test_bundle_iter_entries() {
538        let mut writer = ParxBundleWriter::new();
539        writer.add_entry("a.parquet", 100, b"fa".to_vec());
540        writer.add_entry("b.parquet", 200, b"fb".to_vec());
541
542        let bundle_bytes = writer.finish();
543        let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
544
545        let entries: Vec<_> = reader.iter_entries().collect();
546        assert_eq!(entries.len(), 2);
547        assert_eq!(entries[0].parquet_path, "a.parquet");
548        assert_eq!(entries[0].footer_bytes, b"fa");
549        assert!(entries[0].page_index_bytes.is_none());
550        assert_eq!(entries[1].parquet_path, "b.parquet");
551        assert_eq!(entries[1].footer_bytes, b"fb");
552        assert!(entries[1].page_index_bytes.is_none());
553    }
554
555    #[test]
556    fn test_bundle_validate_all() {
557        let mut writer = ParxBundleWriter::new();
558        writer.add_entry("test.parquet", 1000, b"footer".to_vec());
559
560        let bundle_bytes = writer.finish();
561        let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
562
563        // Should pass validation
564        reader.validate_all().expect("validation should pass");
565    }
566
567    #[test]
568    fn test_bundle_parquet_paths() {
569        let mut writer = ParxBundleWriter::new();
570        writer.add_entry("z.parquet", 100, b"f".to_vec());
571        writer.add_entry("a.parquet", 200, b"f".to_vec());
572        writer.add_entry("m.parquet", 300, b"f".to_vec());
573
574        let bundle_bytes = writer.finish();
575        let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
576
577        let paths = reader.parquet_paths();
578        assert_eq!(paths, vec!["z.parquet", "a.parquet", "m.parquet"]);
579    }
580
581    #[test]
582    fn test_invalid_bundle_magic() {
583        let mut data = vec![0u8; 100];
584        data[0..4].copy_from_slice(b"NOPE");
585
586        let result = ParxBundleReader::open(Bytes::from(data));
587        assert!(matches!(result, Err(ParxError::InvalidBundleMagic(_))));
588    }
589
590    #[test]
591    fn test_bundle_manifest_crc_mismatch() {
592        let mut writer = ParxBundleWriter::new();
593        writer.add_entry("test.parquet", 1000, b"footer".to_vec());
594
595        let mut bundle_bytes = writer.finish();
596        // Corrupt manifest CRC (bytes -8 to -5 from end)
597        let len = bundle_bytes.len();
598        bundle_bytes[len - 8] ^= 0xFF;
599
600        let result = ParxBundleReader::open(Bytes::from(bundle_bytes));
601        assert!(matches!(
602            result,
603            Err(ParxError::ManifestChecksumMismatch { .. })
604        ));
605    }
606
607    #[test]
608    fn test_bundle_reader_handles_normal_case() {
609        // Create a simple valid bundle as baseline
610        let mut writer = ParxBundleWriter::new();
611
612        // Add a normal entry
613        let footer = vec![1, 2, 3, 4];
614        writer.add_entry("test.parquet", 1000, footer.clone());
615
616        let bundle_bytes = writer.finish();
617
618        // Should open and read successfully
619        let reader = ParxBundleReader::open(bundle_bytes.into()).expect("Should open valid bundle");
620        let retrieved = reader
621            .get_footer("test.parquet")
622            .expect("Should find footer");
623        assert_eq!(retrieved, &footer[..]);
624    }
625
626    #[test]
627    fn test_bundle_with_page_indexes_roundtrip() {
628        let mut writer = ParxBundleWriter::new();
629        writer.add_entry_with_page_indexes(
630            "test.parquet",
631            1000,
632            b"footer".to_vec(),
633            b"pi".to_vec(),
634        );
635
636        let bundle_bytes = writer.finish();
637        let reader = ParxBundleReader::open(bundle_bytes.into()).expect("Should open valid bundle");
638        let entry = reader
639            .get_entry("test.parquet")
640            .expect("Should find bundle entry");
641
642        assert_eq!(entry.footer_bytes, b"footer");
643        assert_eq!(entry.page_index_bytes, Some(b"pi".as_slice()));
644        reader.validate_all().expect("validation should pass");
645    }
646}