1use crate::error::{ParxError, Result};
18use crate::format::{BundleHeader, Trailer, BUNDLE_HEADER_SIZE, BUNDLE_MAGIC, TRAILER_SIZE};
19use crate::proto::{BundleEntry, ParxBundle};
20use bytes::Bytes;
21use prost::Message;
22use std::collections::HashMap;
23use std::time::{SystemTime, UNIX_EPOCH};
24
25pub const BUNDLE_FILENAME: &str = "_parx_bundle.parx";
27
28#[derive(Debug, Clone)]
30pub struct BundleEntryData {
31 pub parquet_path: String,
33 pub source_size: u64,
35 pub footer_bytes: Bytes,
37 pub page_index_bytes: Bytes,
39}
40
41#[derive(Debug)]
45pub struct ParxBundleWriter {
46 entries: Vec<BundleEntryData>,
47}
48
49impl ParxBundleWriter {
50 pub const fn new() -> Self {
52 Self {
53 entries: Vec::new(),
54 }
55 }
56
57 pub fn add_entry(
59 &mut self,
60 parquet_path: &str,
61 source_size: u64,
62 footer_bytes: impl Into<Bytes>,
63 ) {
64 self.add_entry_with_page_indexes(parquet_path, source_size, footer_bytes, Bytes::new());
65 }
66
67 pub fn add_entry_with_page_indexes(
69 &mut self,
70 parquet_path: &str,
71 source_size: u64,
72 footer_bytes: impl Into<Bytes>,
73 page_index_bytes: impl Into<Bytes>,
74 ) {
75 self.entries.push(BundleEntryData {
76 parquet_path: parquet_path.to_string(),
77 source_size,
78 footer_bytes: footer_bytes.into(),
79 page_index_bytes: page_index_bytes.into(),
80 });
81 }
82
83 #[inline]
85 pub fn entry_count(&self) -> usize {
86 self.entries.len()
87 }
88
89 pub fn finish(self) -> Vec<u8> {
94 let header = BundleHeader::new(self.entries.len() as u64);
95 let header_bytes = header.to_bytes();
96
97 let mut current_offset = BUNDLE_HEADER_SIZE as u64;
98 let mut payload = Vec::new();
99 let mut bundle_entries = Vec::new();
100
101 for entry in &self.entries {
102 let footer_offset = current_offset;
104 let footer_length = entry.footer_bytes.len() as u64;
105 let footer_checksum = crc32c::crc32c(&entry.footer_bytes).to_le_bytes().to_vec();
106
107 payload.extend_from_slice(&entry.footer_bytes);
108 current_offset += footer_length;
109
110 let (page_index_offset, page_index_length, page_index_checksum) =
112 if entry.page_index_bytes.is_empty() {
113 (0, 0, Vec::new())
114 } else {
115 let page_index_offset = current_offset;
116 let page_index_length = entry.page_index_bytes.len() as u64;
117 let page_index_checksum = crc32c::crc32c(&entry.page_index_bytes)
118 .to_le_bytes()
119 .to_vec();
120 payload.extend_from_slice(&entry.page_index_bytes);
121 current_offset += page_index_length;
122 (page_index_offset, page_index_length, page_index_checksum)
123 };
124
125 bundle_entries.push(BundleEntry {
126 parquet_path: entry.parquet_path.clone(),
127 source_size: entry.source_size,
128 footer_offset,
129 footer_length,
130 footer_checksum,
131 page_index_offset,
132 page_index_length,
133 page_index_checksum,
134 });
135 }
136
137 #[allow(clippy::cast_possible_truncation)]
139 let created_at_ms = SystemTime::now()
140 .duration_since(UNIX_EPOCH)
141 .map(|d| d.as_millis() as u64)
142 .unwrap_or(0);
143
144 let bundle = ParxBundle {
146 version: 1,
147 created_at_ms,
148 entries: bundle_entries,
149 };
150
151 let manifest_bytes = bundle.encode_to_vec();
152 let manifest_crc = crc32c::crc32c(&manifest_bytes);
153
154 let manifest_len = u32::try_from(manifest_bytes.len()).expect("manifest too large (>4GB)");
156 let trailer = Trailer::new(manifest_len, manifest_crc, BUNDLE_MAGIC);
157 let trailer_bytes = trailer.to_bytes();
158
159 let total_size =
161 header_bytes.len() + payload.len() + manifest_bytes.len() + trailer_bytes.len();
162
163 let mut output = Vec::with_capacity(total_size);
164 output.extend_from_slice(&header_bytes);
165 output.extend_from_slice(&payload);
166 output.extend_from_slice(&manifest_bytes);
167 output.extend_from_slice(&trailer_bytes);
168
169 output
170 }
171}
172
173impl Default for ParxBundleWriter {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179#[derive(Debug, Clone)]
181pub struct BundleEntryRef<'a> {
182 pub parquet_path: &'a str,
184 pub source_size: u64,
186 pub footer_bytes: &'a [u8],
188 pub page_index_bytes: Option<&'a [u8]>,
190}
191
192#[derive(Debug, Clone)]
196pub struct ParxBundleReader {
197 header: BundleHeader,
198 bundle: ParxBundle,
199 data: Bytes,
200 path_index: HashMap<String, usize>,
202}
203
204impl ParxBundleReader {
205 pub fn open(data: Bytes) -> Result<Self> {
213 let file_size = data.len();
214 let min_size = BUNDLE_HEADER_SIZE + TRAILER_SIZE;
215
216 if file_size < min_size {
217 return Err(ParxError::FileTooSmall {
218 size: file_size,
219 minimum: min_size,
220 });
221 }
222
223 let header_bytes: [u8; BUNDLE_HEADER_SIZE] = data[..BUNDLE_HEADER_SIZE]
225 .try_into()
226 .expect("header slice length verified above");
227 let header = BundleHeader::from_bytes(&header_bytes);
228
229 if !header.is_magic_valid() {
231 return Err(ParxError::InvalidBundleMagic(header.magic));
232 }
233
234 if !header.is_version_supported() {
236 return Err(ParxError::UnsupportedVersion {
237 major: header.version_major,
238 minor: header.version_minor,
239 });
240 }
241
242 let trailer_bytes: [u8; TRAILER_SIZE] = data[file_size - TRAILER_SIZE..]
244 .try_into()
245 .expect("trailer slice length verified above");
246 let trailer = Trailer::from_bytes(&trailer_bytes);
247
248 if !trailer.is_magic_valid(BUNDLE_MAGIC) {
250 return Err(ParxError::InvalidBundleMagic(trailer.magic));
251 }
252
253 let manifest_end = file_size - TRAILER_SIZE;
255 let manifest_start = manifest_end
256 .checked_sub(trailer.manifest_len as usize)
257 .ok_or(ParxError::FileTooSmall {
258 size: file_size,
259 minimum: min_size + trailer.manifest_len as usize,
260 })?;
261
262 let manifest_bytes = &data[manifest_start..manifest_end];
263
264 let actual_crc = crc32c::crc32c(manifest_bytes);
266 if actual_crc != trailer.manifest_crc32c {
267 return Err(ParxError::ManifestChecksumMismatch {
268 expected: trailer.manifest_crc32c,
269 actual: actual_crc,
270 });
271 }
272
273 let bundle = ParxBundle::decode(manifest_bytes)?;
275
276 let path_index: HashMap<String, usize> = bundle
278 .entries
279 .iter()
280 .enumerate()
281 .map(|(i, e)| (e.parquet_path.clone(), i))
282 .collect();
283
284 Ok(Self {
285 header,
286 bundle,
287 data,
288 path_index,
289 })
290 }
291
292 #[inline]
294 pub const fn header(&self) -> &BundleHeader {
295 &self.header
296 }
297
298 #[inline]
300 pub fn entry_count(&self) -> usize {
301 self.bundle.entries.len()
302 }
303
304 #[inline]
306 pub const fn created_at_ms(&self) -> u64 {
307 self.bundle.created_at_ms
308 }
309
310 #[inline]
312 pub fn contains(&self, parquet_path: &str) -> bool {
313 self.path_index.contains_key(parquet_path)
314 }
315
316 pub fn parquet_paths(&self) -> Vec<&str> {
318 self.bundle
319 .entries
320 .iter()
321 .map(|e| e.parquet_path.as_str())
322 .collect()
323 }
324
325 pub fn get_footer(&self, parquet_path: &str) -> Option<&[u8]> {
327 let idx = *self.path_index.get(parquet_path)?;
328 let entry = &self.bundle.entries[idx];
329
330 let start = usize::try_from(entry.footer_offset).ok()?;
331 let length = usize::try_from(entry.footer_length).ok()?;
332 let end = start
333 .checked_add(length)
334 .filter(|&end| end <= self.data.len())?;
335
336 Some(&self.data[start..end])
337 }
338
339 pub fn get_source_size(&self, parquet_path: &str) -> Option<u64> {
341 let idx = *self.path_index.get(parquet_path)?;
342 Some(self.bundle.entries[idx].source_size)
343 }
344
345 pub fn validate_source_size(&self, parquet_path: &str, actual_size: u64) -> bool {
347 self.get_source_size(parquet_path)
348 .is_some_and(|expected| expected == actual_size)
349 }
350
351 pub fn get_entry(&self, parquet_path: &str) -> Option<BundleEntryRef<'_>> {
353 let idx = *self.path_index.get(parquet_path)?;
354 let entry = &self.bundle.entries[idx];
355
356 let footer_start = usize::try_from(entry.footer_offset).ok()?;
357 let footer_length = usize::try_from(entry.footer_length).ok()?;
358 let footer_end = footer_start
359 .checked_add(footer_length)
360 .filter(|&end| end <= self.data.len())?;
361
362 let footer_bytes = &self.data[footer_start..footer_end];
363 let page_index_bytes = self.resolve_page_index_bytes(entry)?;
364
365 Some(BundleEntryRef {
366 parquet_path: &entry.parquet_path,
367 source_size: entry.source_size,
368 footer_bytes,
369 page_index_bytes,
370 })
371 }
372
373 pub fn iter_entries(&self) -> impl Iterator<Item = BundleEntryRef<'_>> {
375 self.bundle.entries.iter().filter_map(|entry| {
376 let footer_start = usize::try_from(entry.footer_offset).ok()?;
377 let footer_length = usize::try_from(entry.footer_length).ok()?;
378 let footer_end = footer_start
379 .checked_add(footer_length)
380 .filter(|&end| end <= self.data.len())?;
381
382 let footer_bytes = &self.data[footer_start..footer_end];
383 let page_index_bytes = self.resolve_page_index_bytes(entry)?;
384
385 Some(BundleEntryRef {
386 parquet_path: &entry.parquet_path,
387 source_size: entry.source_size,
388 footer_bytes,
389 page_index_bytes,
390 })
391 })
392 }
393
394 fn resolve_page_index_bytes<'a>(&'a self, entry: &BundleEntry) -> Option<Option<&'a [u8]>> {
395 if entry.page_index_length == 0 {
396 return Some(None);
397 }
398
399 let start = usize::try_from(entry.page_index_offset).ok()?;
400 let length = usize::try_from(entry.page_index_length).ok()?;
401 let end = start
402 .checked_add(length)
403 .filter(|&end| end <= self.data.len())?;
404 Some(Some(&self.data[start..end]))
405 }
406
407 pub fn validate_all(&self) -> Result<()> {
412 for entry in &self.bundle.entries {
413 let footer_start = usize::try_from(entry.footer_offset).map_err(|_| {
415 ParxError::InvalidPayloadBounds {
416 offset: entry.footer_offset,
417 length: entry.footer_length,
418 file_size: self.data.len() as u64,
419 }
420 })?;
421 let footer_length = usize::try_from(entry.footer_length).map_err(|_| {
422 ParxError::InvalidPayloadBounds {
423 offset: entry.footer_offset,
424 length: entry.footer_length,
425 file_size: self.data.len() as u64,
426 }
427 })?;
428 let footer_end = footer_start.checked_add(footer_length).ok_or_else(|| {
429 ParxError::InvalidPayloadBounds {
430 offset: entry.footer_offset,
431 length: entry.footer_length,
432 file_size: self.data.len() as u64,
433 }
434 })?;
435
436 if footer_end > self.data.len() {
437 return Err(ParxError::InvalidPayloadBounds {
438 offset: entry.footer_offset,
439 length: entry.footer_length,
440 file_size: self.data.len() as u64,
441 });
442 }
443
444 let footer_bytes = &self.data[footer_start..footer_end];
445 let footer_crc = crc32c::crc32c(footer_bytes);
446
447 if footer_crc.to_le_bytes().as_slice() != entry.footer_checksum.as_slice() {
448 return Err(ParxError::FooterChecksumMismatch);
449 }
450
451 if entry.page_index_length > 0 {
453 let page_index_start = usize::try_from(entry.page_index_offset).map_err(|_| {
454 ParxError::InvalidPayloadBounds {
455 offset: entry.page_index_offset,
456 length: entry.page_index_length,
457 file_size: self.data.len() as u64,
458 }
459 })?;
460 let page_index_length = usize::try_from(entry.page_index_length).map_err(|_| {
461 ParxError::InvalidPayloadBounds {
462 offset: entry.page_index_offset,
463 length: entry.page_index_length,
464 file_size: self.data.len() as u64,
465 }
466 })?;
467 let page_index_end =
468 page_index_start
469 .checked_add(page_index_length)
470 .ok_or_else(|| ParxError::InvalidPayloadBounds {
471 offset: entry.page_index_offset,
472 length: entry.page_index_length,
473 file_size: self.data.len() as u64,
474 })?;
475
476 if page_index_end > self.data.len() {
477 return Err(ParxError::InvalidPayloadBounds {
478 offset: entry.page_index_offset,
479 length: entry.page_index_length,
480 file_size: self.data.len() as u64,
481 });
482 }
483
484 let page_index_bytes = &self.data[page_index_start..page_index_end];
485 let page_index_crc = crc32c::crc32c(page_index_bytes);
486 if page_index_crc.to_le_bytes().as_slice() != entry.page_index_checksum.as_slice() {
487 return Err(ParxError::PageIndexChecksumMismatch);
488 }
489 }
490 }
491
492 Ok(())
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 #[test]
501 fn test_bundle_roundtrip() {
502 let mut writer = ParxBundleWriter::new();
503 writer.add_entry("part-00000.parquet", 1000, b"footer0".to_vec());
504 writer.add_entry("part-00001.parquet", 2000, b"footer1".to_vec());
505 writer.add_entry("part-00002.parquet", 3000, b"footer2".to_vec());
506
507 let bundle_bytes = writer.finish();
508 let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
509
510 assert_eq!(reader.entry_count(), 3);
511 assert!(reader.contains("part-00000.parquet"));
512 assert!(reader.contains("part-00001.parquet"));
513 assert!(reader.contains("part-00002.parquet"));
514 assert!(!reader.contains("nonexistent.parquet"));
515
516 assert_eq!(
517 reader.get_footer("part-00000.parquet"),
518 Some(b"footer0".as_slice())
519 );
520 assert_eq!(
521 reader.get_footer("part-00001.parquet"),
522 Some(b"footer1".as_slice())
523 );
524 assert_eq!(
525 reader.get_footer("part-00002.parquet"),
526 Some(b"footer2".as_slice())
527 );
528
529 assert_eq!(reader.get_source_size("part-00000.parquet"), Some(1000));
530 assert_eq!(reader.get_source_size("part-00001.parquet"), Some(2000));
531
532 assert!(reader.validate_source_size("part-00000.parquet", 1000));
533 assert!(!reader.validate_source_size("part-00000.parquet", 9999));
534 }
535
536 #[test]
537 fn test_bundle_iter_entries() {
538 let mut writer = ParxBundleWriter::new();
539 writer.add_entry("a.parquet", 100, b"fa".to_vec());
540 writer.add_entry("b.parquet", 200, b"fb".to_vec());
541
542 let bundle_bytes = writer.finish();
543 let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
544
545 let entries: Vec<_> = reader.iter_entries().collect();
546 assert_eq!(entries.len(), 2);
547 assert_eq!(entries[0].parquet_path, "a.parquet");
548 assert_eq!(entries[0].footer_bytes, b"fa");
549 assert!(entries[0].page_index_bytes.is_none());
550 assert_eq!(entries[1].parquet_path, "b.parquet");
551 assert_eq!(entries[1].footer_bytes, b"fb");
552 assert!(entries[1].page_index_bytes.is_none());
553 }
554
555 #[test]
556 fn test_bundle_validate_all() {
557 let mut writer = ParxBundleWriter::new();
558 writer.add_entry("test.parquet", 1000, b"footer".to_vec());
559
560 let bundle_bytes = writer.finish();
561 let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
562
563 reader.validate_all().expect("validation should pass");
565 }
566
567 #[test]
568 fn test_bundle_parquet_paths() {
569 let mut writer = ParxBundleWriter::new();
570 writer.add_entry("z.parquet", 100, b"f".to_vec());
571 writer.add_entry("a.parquet", 200, b"f".to_vec());
572 writer.add_entry("m.parquet", 300, b"f".to_vec());
573
574 let bundle_bytes = writer.finish();
575 let reader = ParxBundleReader::open(Bytes::from(bundle_bytes)).expect("failed to open");
576
577 let paths = reader.parquet_paths();
578 assert_eq!(paths, vec!["z.parquet", "a.parquet", "m.parquet"]);
579 }
580
581 #[test]
582 fn test_invalid_bundle_magic() {
583 let mut data = vec![0u8; 100];
584 data[0..4].copy_from_slice(b"NOPE");
585
586 let result = ParxBundleReader::open(Bytes::from(data));
587 assert!(matches!(result, Err(ParxError::InvalidBundleMagic(_))));
588 }
589
590 #[test]
591 fn test_bundle_manifest_crc_mismatch() {
592 let mut writer = ParxBundleWriter::new();
593 writer.add_entry("test.parquet", 1000, b"footer".to_vec());
594
595 let mut bundle_bytes = writer.finish();
596 let len = bundle_bytes.len();
598 bundle_bytes[len - 8] ^= 0xFF;
599
600 let result = ParxBundleReader::open(Bytes::from(bundle_bytes));
601 assert!(matches!(
602 result,
603 Err(ParxError::ManifestChecksumMismatch { .. })
604 ));
605 }
606
607 #[test]
608 fn test_bundle_reader_handles_normal_case() {
609 let mut writer = ParxBundleWriter::new();
611
612 let footer = vec![1, 2, 3, 4];
614 writer.add_entry("test.parquet", 1000, footer.clone());
615
616 let bundle_bytes = writer.finish();
617
618 let reader = ParxBundleReader::open(bundle_bytes.into()).expect("Should open valid bundle");
620 let retrieved = reader
621 .get_footer("test.parquet")
622 .expect("Should find footer");
623 assert_eq!(retrieved, &footer[..]);
624 }
625
626 #[test]
627 fn test_bundle_with_page_indexes_roundtrip() {
628 let mut writer = ParxBundleWriter::new();
629 writer.add_entry_with_page_indexes(
630 "test.parquet",
631 1000,
632 b"footer".to_vec(),
633 b"pi".to_vec(),
634 );
635
636 let bundle_bytes = writer.finish();
637 let reader = ParxBundleReader::open(bundle_bytes.into()).expect("Should open valid bundle");
638 let entry = reader
639 .get_entry("test.parquet")
640 .expect("Should find bundle entry");
641
642 assert_eq!(entry.footer_bytes, b"footer");
643 assert_eq!(entry.page_index_bytes, Some(b"pi".as_slice()));
644 reader.validate_all().expect("validation should pass");
645 }
646}