1use crate::compression::{self, should_auto_compress};
19use crate::error::{ParxError, Result};
20use crate::format::{Compression, Header, Trailer, HEADER_SIZE, MAGIC};
21use crate::proto::ParxManifest;
22use bytes::Bytes;
23use prost::Message;
24use std::fs::File;
25use std::io::{Read, Seek, SeekFrom};
26use std::path::Path;
27use std::time::{SystemTime, UNIX_EPOCH};
28
29#[derive(Debug)]
33pub struct ParxWriter {
34 source_uri: String,
35 source_size: u64,
36 footer_bytes: Bytes,
37 compression: Option<Compression>,
38 page_index_bytes: Bytes,
39}
40
41impl ParxWriter {
42 #[inline]
44 pub fn new() -> Self {
45 Self {
46 source_uri: String::new(),
47 source_size: 0,
48 footer_bytes: Bytes::new(),
49 compression: None,
50 page_index_bytes: Bytes::new(),
51 }
52 }
53
54 pub fn from_parquet_bytes(data: &[u8]) -> Result<Self> {
58 if data.len() < 12 {
60 return Err(ParxError::FileTooSmall {
61 size: data.len(),
62 minimum: 12,
63 });
64 }
65
66 let mut head_magic = [0u8; 4];
68 head_magic.copy_from_slice(&data[0..4]);
69 if &head_magic != b"PAR1" {
70 return Err(ParxError::InvalidParquetMagic(head_magic));
71 }
72
73 let mut tail_magic = [0u8; 4];
75 tail_magic.copy_from_slice(&data[data.len() - 4..]);
76 if &tail_magic != b"PAR1" {
77 return Err(ParxError::InvalidParquetMagic(tail_magic));
78 }
79
80 let footer_len = u32::from_le_bytes([
82 data[data.len() - 8],
83 data[data.len() - 7],
84 data[data.len() - 6],
85 data[data.len() - 5],
86 ]) as u64;
87
88 let file_size = data.len() as u64;
89 if footer_len + 12 > file_size {
91 return Err(ParxError::InvalidParquetFooterLength {
92 footer_len,
93 file_size,
94 });
95 }
96
97 let footer_start = data.len() - 8 - footer_len as usize;
98 let footer_bytes = &data[footer_start..data.len() - 8];
99
100 let mut writer = Self::new();
101 writer.set_source_size(file_size);
102 writer.set_footer(footer_bytes);
103 Ok(writer)
104 }
105
106 pub fn from_parquet_file(path: impl AsRef<Path>) -> Result<Self> {
111 let path = path.as_ref();
112 let (file_size, footer_bytes) = read_parquet_footer_from_file(path)?;
113 let mut writer = Self::new();
114 writer.set_source_size(file_size);
115 writer.set_footer_owned(footer_bytes);
116 writer.set_source_uri(path.display().to_string());
117 Ok(writer)
118 }
119
120 #[inline]
122 pub fn set_source_uri(&mut self, uri: impl Into<String>) {
123 self.source_uri = uri.into();
124 }
125
126 #[inline]
128 pub fn set_source_size(&mut self, size: u64) {
129 self.source_size = size;
130 }
131
132 #[inline]
134 pub fn set_footer(&mut self, bytes: &[u8]) {
135 self.footer_bytes = Bytes::copy_from_slice(bytes);
136 }
137
138 #[inline]
140 pub fn set_footer_owned(&mut self, bytes: impl Into<Bytes>) {
141 self.footer_bytes = bytes.into();
142 }
143
144 #[inline]
146 pub fn set_compression(&mut self, compression: Compression) {
147 self.compression = Some(compression);
148 }
149
150 #[inline]
152 pub fn clear_compression(&mut self) {
153 self.compression = None;
154 }
155
156 #[inline]
160 pub fn auto_compress(&mut self) {
161 if should_auto_compress(self.footer_bytes.len()) {
162 self.compression = Some(Compression::Zstd);
163 }
164 }
165
166 #[inline]
168 pub const fn compression(&self) -> Option<Compression> {
169 self.compression
170 }
171
172 #[inline]
174 pub const fn source_size(&self) -> u64 {
175 self.source_size
176 }
177
178 #[inline]
180 pub fn footer_size(&self) -> usize {
181 self.footer_bytes.len()
182 }
183
184 #[inline]
189 pub fn set_page_indexes(&mut self, bytes: &[u8]) {
190 self.page_index_bytes = Bytes::copy_from_slice(bytes);
191 }
192
193 #[inline]
195 pub fn set_page_indexes_owned(&mut self, bytes: impl Into<Bytes>) {
196 self.page_index_bytes = bytes.into();
197 }
198
199 #[inline]
201 pub fn has_page_indexes(&self) -> bool {
202 !self.page_index_bytes.is_empty()
203 }
204
205 pub fn finish(self) -> Vec<u8> {
217 let mut header = Header::new();
218
219 let source_footer_checksum = crc32c::crc32c(&self.footer_bytes).to_le_bytes().to_vec();
221
222 let (footer_payload, footer_uncompressed_size) = match self.compression {
224 Some(algo) => {
225 header.set_compression(algo);
226 let compressed = compression::compress(&self.footer_bytes, algo)
227 .expect("compression should not fail on valid data");
228 (compressed, self.footer_bytes.len() as u64)
229 }
230 None => (self.footer_bytes.to_vec(), 0),
231 };
232
233 let footer_checksum = crc32c::crc32c(&footer_payload).to_le_bytes().to_vec();
235
236 let footer_offset = HEADER_SIZE as u64;
238 let footer_length = footer_payload.len() as u64;
239
240 let page_index_offset = footer_offset + footer_length;
242 let (page_index_payload, page_index_uncompressed_size) = if self.page_index_bytes.is_empty()
243 {
244 (Vec::new(), 0)
245 } else {
246 match self.compression {
248 Some(algo) => {
249 let compressed = compression::compress(&self.page_index_bytes, algo)
250 .expect("compression should not fail on valid data");
251 (compressed, self.page_index_bytes.len() as u64)
252 }
253 None => (self.page_index_bytes.to_vec(), 0),
254 }
255 };
256
257 let page_index_length = page_index_payload.len() as u64;
258 let page_index_checksum = if page_index_length > 0 {
259 crc32c::crc32c(&page_index_payload).to_le_bytes().to_vec()
260 } else {
261 Vec::new()
262 };
263
264 #[allow(clippy::cast_possible_truncation)]
266 let created_at_ms = SystemTime::now()
267 .duration_since(UNIX_EPOCH)
268 .map(|d| d.as_millis() as u64)
269 .unwrap_or(0);
270
271 let header_bytes = header.to_bytes();
272
273 let manifest = ParxManifest {
275 version: 1,
276 source_uri: self.source_uri,
277 source_size: self.source_size,
278 source_footer_checksum,
279 footer_offset,
280 footer_length,
281 footer_checksum,
282 created_at_ms,
283 footer_uncompressed_size,
285 page_index_offset,
287 page_index_length,
288 page_index_checksum,
289 page_index_uncompressed_size,
290 };
291
292 let manifest_bytes = manifest.encode_to_vec();
294 let manifest_crc = crc32c::crc32c(&manifest_bytes);
295
296 let manifest_len = u32::try_from(manifest_bytes.len()).expect("manifest too large (>4GB)");
298 let trailer = Trailer::new(manifest_len, manifest_crc, MAGIC);
299 let trailer_bytes = trailer.to_bytes();
300
301 let total_size = header_bytes.len()
303 + footer_payload.len()
304 + page_index_payload.len()
305 + manifest_bytes.len()
306 + trailer_bytes.len();
307
308 let mut output = Vec::with_capacity(total_size);
309 output.extend_from_slice(&header_bytes);
310 output.extend_from_slice(&footer_payload);
311 output.extend_from_slice(&page_index_payload);
312 output.extend_from_slice(&manifest_bytes);
313 output.extend_from_slice(&trailer_bytes);
314
315 output
316 }
317}
318
319impl Default for ParxWriter {
320 fn default() -> Self {
321 Self::new()
322 }
323}
324
325fn read_parquet_footer_from_file(path: &Path) -> Result<(u64, Bytes)> {
326 let mut file = File::open(path)?;
327 let file_size = file.metadata()?.len();
328
329 if file_size < 12 {
330 return Err(ParxError::FileTooSmall {
331 size: usize::try_from(file_size).unwrap_or(usize::MAX),
332 minimum: 12,
333 });
334 }
335
336 let mut head_magic = [0u8; 4];
337 file.read_exact(&mut head_magic)?;
338 if &head_magic != b"PAR1" {
339 return Err(ParxError::InvalidParquetMagic(head_magic));
340 }
341
342 file.seek(SeekFrom::End(-8))?;
343 let mut footer_trailer = [0u8; 8];
344 file.read_exact(&mut footer_trailer)?;
345
346 let footer_len = u32::from_le_bytes(footer_trailer[..4].try_into().expect("slice len")) as u64;
347 let tail_magic: [u8; 4] = footer_trailer[4..8].try_into().expect("slice len");
348 if &tail_magic != b"PAR1" {
349 return Err(ParxError::InvalidParquetMagic(tail_magic));
350 }
351
352 if footer_len + 12 > file_size {
353 return Err(ParxError::InvalidParquetFooterLength {
354 footer_len,
355 file_size,
356 });
357 }
358
359 let footer_start = file_size - 8 - footer_len;
360 file.seek(SeekFrom::Start(footer_start))?;
361 let mut footer = vec![0u8; footer_len as usize];
362 file.read_exact(&mut footer)?;
363
364 Ok((file_size, Bytes::from(footer)))
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 fn valid_parquet_bytes() -> Vec<u8> {
372 let footer = b"abc";
373 let mut data = Vec::new();
374 data.extend_from_slice(b"PAR1");
375 data.extend_from_slice(footer);
376 data.extend_from_slice(&(footer.len() as u32).to_le_bytes());
377 data.extend_from_slice(b"PAR1");
378 data
379 }
380
381 #[test]
382 fn test_writer_creates_valid_structure() {
383 let mut writer = ParxWriter::new();
384 writer.set_source_size(1000);
385 writer.set_footer(b"test footer");
386
387 let bytes = writer.finish();
388
389 assert_eq!(&bytes[0..4], b"PARX");
391
392 assert_eq!(&bytes[bytes.len() - 4..], b"PARX");
394 }
395
396 #[test]
397 fn test_set_footer_owned() {
398 let footer = vec![1, 2, 3, 4, 5];
399 let mut writer = ParxWriter::new();
400 writer.set_source_size(100);
401 writer.set_footer_owned(footer);
402
403 let bytes = writer.finish();
404 assert_eq!(&bytes[0..4], b"PARX");
405 }
406
407 #[test]
408 fn test_writer_with_compression() {
409 let footer = b"test footer data that will be compressed".repeat(100);
410 let mut writer = ParxWriter::new();
411 writer.set_source_size(1000);
412 writer.set_footer(&footer);
413 writer.set_compression(Compression::Zstd);
414
415 let bytes = writer.finish();
416
417 assert_eq!(&bytes[0..4], b"PARX");
419
420 let header = Header::from_bytes(bytes[..HEADER_SIZE].try_into().unwrap());
422 assert!(header.is_footer_compressed());
423 assert_eq!(header.compression_algorithm(), Some(Compression::Zstd));
424 }
425
426 #[test]
427 fn test_auto_compress() {
428 let mut writer = ParxWriter::new();
430 writer.set_footer(b"small");
431 writer.auto_compress();
432 assert!(writer.compression.is_none());
433
434 let mut writer = ParxWriter::new();
436 writer.set_footer(&vec![0u8; 20_000]);
437 writer.auto_compress();
438 assert_eq!(writer.compression, Some(Compression::Zstd));
439 }
440
441 #[test]
442 fn test_from_parquet_bytes() {
443 let data = valid_parquet_bytes();
444 let writer = ParxWriter::from_parquet_bytes(&data).unwrap();
445
446 assert_eq!(writer.source_size, data.len() as u64);
447 assert_eq!(writer.footer_bytes, Bytes::from_static(b"abc"));
448 }
449
450 #[test]
451 fn test_from_parquet_bytes_invalid_magic() {
452 let mut data = valid_parquet_bytes();
453 data[0..4].copy_from_slice(b"XXXX");
454
455 let err = ParxWriter::from_parquet_bytes(&data).unwrap_err();
456 assert!(matches!(
457 err,
458 ParxError::InvalidParquetMagic(m) if m == *b"XXXX"
459 ));
460 }
461
462 #[test]
463 fn test_from_parquet_bytes_invalid_footer_length() {
464 let mut data = valid_parquet_bytes();
465 let file_size = data.len() as u64;
466 let data_len = data.len();
467 data[data_len - 8..data_len - 4].copy_from_slice(&(100u32).to_le_bytes());
468
469 let err = ParxWriter::from_parquet_bytes(&data).unwrap_err();
470 assert!(matches!(
471 err,
472 ParxError::InvalidParquetFooterLength {
473 footer_len: 100,
474 file_size: f
475 } if f == file_size
476 ));
477 }
478
479 #[test]
480 fn test_from_parquet_file() {
481 let dir = tempfile::tempdir().unwrap();
482 let path = dir.path().join("data.parquet");
483 let data = valid_parquet_bytes();
484 std::fs::write(&path, data).unwrap();
485
486 let writer = ParxWriter::from_parquet_file(&path).unwrap();
487 assert_eq!(writer.source_uri, path.display().to_string());
488 assert_eq!(writer.footer_bytes, Bytes::from_static(b"abc"));
489 }
490
491 #[test]
492 fn test_from_parquet_file_invalid_footer_length() {
493 let dir = tempfile::tempdir().unwrap();
494 let path = dir.path().join("broken.parquet");
495 let mut data = valid_parquet_bytes();
496 let len = data.len();
497 data[len - 8..len - 4].copy_from_slice(&(100u32).to_le_bytes());
498 std::fs::write(&path, data).unwrap();
499
500 let err = ParxWriter::from_parquet_file(&path).unwrap_err();
501 assert!(matches!(
502 err,
503 ParxError::InvalidParquetFooterLength {
504 footer_len: 100,
505 ..
506 }
507 ));
508 }
509}