Skip to main content

parx_rs/
writer.rs

1/*
2 * Copyright 2026 PARX Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16//! PARX file writer.
17
18use crate::compression::{self, should_auto_compress};
19use crate::error::{ParxError, Result};
20use crate::format::{Compression, Header, Trailer, HEADER_SIZE, MAGIC};
21use crate::proto::ParxManifest;
22use bytes::Bytes;
23use prost::Message;
24use std::fs::File;
25use std::io::{Read, Seek, SeekFrom};
26use std::path::Path;
27use std::time::{SystemTime, UNIX_EPOCH};
28
29/// Writer for PARX sidecar files.
30///
31/// Builds a PARX file from Parquet footer bytes and optional extensions.
32#[derive(Debug)]
33pub struct ParxWriter {
34    source_uri: String,
35    source_size: u64,
36    footer_bytes: Bytes,
37    compression: Option<Compression>,
38    page_index_bytes: Bytes,
39}
40
41impl ParxWriter {
42    /// Create a new PARX writer.
43    #[inline]
44    pub fn new() -> Self {
45        Self {
46            source_uri: String::new(),
47            source_size: 0,
48            footer_bytes: Bytes::new(),
49            compression: None,
50            page_index_bytes: Bytes::new(),
51        }
52    }
53
54    /// Create a writer pre-populated from in-memory Parquet bytes.
55    ///
56    /// Validates the PAR1 magic, extracts the footer, and sets `source_size`.
57    pub fn from_parquet_bytes(data: &[u8]) -> Result<Self> {
58        // Parquet minimum: 4 (magic) + 4 (footer len) + 4 (magic) = 12
59        if data.len() < 12 {
60            return Err(ParxError::FileTooSmall {
61                size: data.len(),
62                minimum: 12,
63            });
64        }
65
66        // Validate leading PAR1 magic
67        let mut head_magic = [0u8; 4];
68        head_magic.copy_from_slice(&data[0..4]);
69        if &head_magic != b"PAR1" {
70            return Err(ParxError::InvalidParquetMagic(head_magic));
71        }
72
73        // Validate trailing PAR1 magic
74        let mut tail_magic = [0u8; 4];
75        tail_magic.copy_from_slice(&data[data.len() - 4..]);
76        if &tail_magic != b"PAR1" {
77            return Err(ParxError::InvalidParquetMagic(tail_magic));
78        }
79
80        // Footer length is a little-endian u32 at offset len-8
81        let footer_len = u32::from_le_bytes([
82            data[data.len() - 8],
83            data[data.len() - 7],
84            data[data.len() - 6],
85            data[data.len() - 5],
86        ]) as u64;
87
88        let file_size = data.len() as u64;
89        // footer + 8 bytes (footer_len + magic) + 4 bytes (leading magic) must fit
90        if footer_len + 12 > file_size {
91            return Err(ParxError::InvalidParquetFooterLength {
92                footer_len,
93                file_size,
94            });
95        }
96
97        let footer_start = data.len() - 8 - footer_len as usize;
98        let footer_bytes = &data[footer_start..data.len() - 8];
99
100        let mut writer = Self::new();
101        writer.set_source_size(file_size);
102        writer.set_footer(footer_bytes);
103        Ok(writer)
104    }
105
106    /// Create a writer pre-populated from a Parquet file on disk.
107    ///
108    /// Reads the file tail, validates PAR1 magic, extracts the footer,
109    /// and sets both `source_size` and `source_uri`.
110    pub fn from_parquet_file(path: impl AsRef<Path>) -> Result<Self> {
111        let path = path.as_ref();
112        let (file_size, footer_bytes) = read_parquet_footer_from_file(path)?;
113        let mut writer = Self::new();
114        writer.set_source_size(file_size);
115        writer.set_footer_owned(footer_bytes);
116        writer.set_source_uri(path.display().to_string());
117        Ok(writer)
118    }
119
120    /// Set the source Parquet file URI (optional, informational).
121    #[inline]
122    pub fn set_source_uri(&mut self, uri: impl Into<String>) {
123        self.source_uri = uri.into();
124    }
125
126    /// Set the source Parquet file size (required for validation).
127    #[inline]
128    pub fn set_source_size(&mut self, size: u64) {
129        self.source_size = size;
130    }
131
132    /// Set the Parquet footer bytes to cache.
133    #[inline]
134    pub fn set_footer(&mut self, bytes: &[u8]) {
135        self.footer_bytes = Bytes::copy_from_slice(bytes);
136    }
137
138    /// Set the Parquet footer bytes to cache (move version, avoids copy when possible).
139    #[inline]
140    pub fn set_footer_owned(&mut self, bytes: impl Into<Bytes>) {
141        self.footer_bytes = bytes.into();
142    }
143
144    /// Set the compression algorithm for the footer.
145    #[inline]
146    pub fn set_compression(&mut self, compression: Compression) {
147        self.compression = Some(compression);
148    }
149
150    /// Clear compression (store footer uncompressed).
151    #[inline]
152    pub fn clear_compression(&mut self) {
153        self.compression = None;
154    }
155
156    /// Enable auto-compression if footer exceeds threshold.
157    ///
158    /// Uses Zstd compression for footers larger than 10KB.
159    #[inline]
160    pub fn auto_compress(&mut self) {
161        if should_auto_compress(self.footer_bytes.len()) {
162            self.compression = Some(Compression::Zstd);
163        }
164    }
165
166    /// Get the current compression setting.
167    #[inline]
168    pub const fn compression(&self) -> Option<Compression> {
169        self.compression
170    }
171
172    /// Get the source Parquet file size.
173    #[inline]
174    pub const fn source_size(&self) -> u64 {
175        self.source_size
176    }
177
178    /// Get the cached footer size in bytes.
179    #[inline]
180    pub fn footer_size(&self) -> usize {
181        self.footer_bytes.len()
182    }
183
184    /// Set the page index bytes to cache.
185    ///
186    /// Page indexes include `ColumnIndex` and `OffsetIndex` structures from Parquet V2.
187    /// These are concatenated together in the order they appear in the file.
188    #[inline]
189    pub fn set_page_indexes(&mut self, bytes: &[u8]) {
190        self.page_index_bytes = Bytes::copy_from_slice(bytes);
191    }
192
193    /// Set the page index bytes to cache (move version, avoids copy when possible).
194    #[inline]
195    pub fn set_page_indexes_owned(&mut self, bytes: impl Into<Bytes>) {
196        self.page_index_bytes = bytes.into();
197    }
198
199    /// Check if page indexes have been added.
200    #[inline]
201    pub fn has_page_indexes(&self) -> bool {
202        !self.page_index_bytes.is_empty()
203    }
204
205    /// Build the PARX file bytes.
206    ///
207    /// Layout:
208    /// - Header (16 bytes)
209    /// - Footer payload (variable, possibly compressed)
210    /// - Page index payload (variable, optional, possibly compressed)
211    /// - Manifest (variable, Protobuf)
212    /// - Trailer (12 bytes)
213    ///
214    /// # Panics
215    /// Panics if manifest exceeds 4GB.
216    pub fn finish(self) -> Vec<u8> {
217        let mut header = Header::new();
218
219        // Compute original footer hash (before compression)
220        let source_footer_checksum = crc32c::crc32c(&self.footer_bytes).to_le_bytes().to_vec();
221
222        // Apply compression if requested
223        let (footer_payload, footer_uncompressed_size) = match self.compression {
224            Some(algo) => {
225                header.set_compression(algo);
226                let compressed = compression::compress(&self.footer_bytes, algo)
227                    .expect("compression should not fail on valid data");
228                (compressed, self.footer_bytes.len() as u64)
229            }
230            None => (self.footer_bytes.to_vec(), 0),
231        };
232
233        // Checksum is computed on stored bytes (possibly compressed)
234        let footer_checksum = crc32c::crc32c(&footer_payload).to_le_bytes().to_vec();
235
236        // Footer starts right after header
237        let footer_offset = HEADER_SIZE as u64;
238        let footer_length = footer_payload.len() as u64;
239
240        // Build page index section if present
241        let page_index_offset = footer_offset + footer_length;
242        let (page_index_payload, page_index_uncompressed_size) = if self.page_index_bytes.is_empty()
243        {
244            (Vec::new(), 0)
245        } else {
246            // Apply same compression as footer
247            match self.compression {
248                Some(algo) => {
249                    let compressed = compression::compress(&self.page_index_bytes, algo)
250                        .expect("compression should not fail on valid data");
251                    (compressed, self.page_index_bytes.len() as u64)
252                }
253                None => (self.page_index_bytes.to_vec(), 0),
254            }
255        };
256
257        let page_index_length = page_index_payload.len() as u64;
258        let page_index_checksum = if page_index_length > 0 {
259            crc32c::crc32c(&page_index_payload).to_le_bytes().to_vec()
260        } else {
261            Vec::new()
262        };
263
264        // Get current timestamp (safe cast: won't overflow until year 2554)
265        #[allow(clippy::cast_possible_truncation)]
266        let created_at_ms = SystemTime::now()
267            .duration_since(UNIX_EPOCH)
268            .map(|d| d.as_millis() as u64)
269            .unwrap_or(0);
270
271        let header_bytes = header.to_bytes();
272
273        // Build manifest
274        let manifest = ParxManifest {
275            version: 1,
276            source_uri: self.source_uri,
277            source_size: self.source_size,
278            source_footer_checksum,
279            footer_offset,
280            footer_length,
281            footer_checksum,
282            created_at_ms,
283            // Compression field
284            footer_uncompressed_size,
285            // Page index fields
286            page_index_offset,
287            page_index_length,
288            page_index_checksum,
289            page_index_uncompressed_size,
290        };
291
292        // Encode manifest
293        let manifest_bytes = manifest.encode_to_vec();
294        let manifest_crc = crc32c::crc32c(&manifest_bytes);
295
296        // Build trailer
297        let manifest_len = u32::try_from(manifest_bytes.len()).expect("manifest too large (>4GB)");
298        let trailer = Trailer::new(manifest_len, manifest_crc, MAGIC);
299        let trailer_bytes = trailer.to_bytes();
300
301        // Assemble file with exact capacity
302        let total_size = header_bytes.len()
303            + footer_payload.len()
304            + page_index_payload.len()
305            + manifest_bytes.len()
306            + trailer_bytes.len();
307
308        let mut output = Vec::with_capacity(total_size);
309        output.extend_from_slice(&header_bytes);
310        output.extend_from_slice(&footer_payload);
311        output.extend_from_slice(&page_index_payload);
312        output.extend_from_slice(&manifest_bytes);
313        output.extend_from_slice(&trailer_bytes);
314
315        output
316    }
317}
318
319impl Default for ParxWriter {
320    fn default() -> Self {
321        Self::new()
322    }
323}
324
325fn read_parquet_footer_from_file(path: &Path) -> Result<(u64, Bytes)> {
326    let mut file = File::open(path)?;
327    let file_size = file.metadata()?.len();
328
329    if file_size < 12 {
330        return Err(ParxError::FileTooSmall {
331            size: usize::try_from(file_size).unwrap_or(usize::MAX),
332            minimum: 12,
333        });
334    }
335
336    let mut head_magic = [0u8; 4];
337    file.read_exact(&mut head_magic)?;
338    if &head_magic != b"PAR1" {
339        return Err(ParxError::InvalidParquetMagic(head_magic));
340    }
341
342    file.seek(SeekFrom::End(-8))?;
343    let mut footer_trailer = [0u8; 8];
344    file.read_exact(&mut footer_trailer)?;
345
346    let footer_len = u32::from_le_bytes(footer_trailer[..4].try_into().expect("slice len")) as u64;
347    let tail_magic: [u8; 4] = footer_trailer[4..8].try_into().expect("slice len");
348    if &tail_magic != b"PAR1" {
349        return Err(ParxError::InvalidParquetMagic(tail_magic));
350    }
351
352    if footer_len + 12 > file_size {
353        return Err(ParxError::InvalidParquetFooterLength {
354            footer_len,
355            file_size,
356        });
357    }
358
359    let footer_start = file_size - 8 - footer_len;
360    file.seek(SeekFrom::Start(footer_start))?;
361    let mut footer = vec![0u8; footer_len as usize];
362    file.read_exact(&mut footer)?;
363
364    Ok((file_size, Bytes::from(footer)))
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    fn valid_parquet_bytes() -> Vec<u8> {
372        let footer = b"abc";
373        let mut data = Vec::new();
374        data.extend_from_slice(b"PAR1");
375        data.extend_from_slice(footer);
376        data.extend_from_slice(&(footer.len() as u32).to_le_bytes());
377        data.extend_from_slice(b"PAR1");
378        data
379    }
380
381    #[test]
382    fn test_writer_creates_valid_structure() {
383        let mut writer = ParxWriter::new();
384        writer.set_source_size(1000);
385        writer.set_footer(b"test footer");
386
387        let bytes = writer.finish();
388
389        // Check header magic
390        assert_eq!(&bytes[0..4], b"PARX");
391
392        // Check trailer magic (last 4 bytes)
393        assert_eq!(&bytes[bytes.len() - 4..], b"PARX");
394    }
395
396    #[test]
397    fn test_set_footer_owned() {
398        let footer = vec![1, 2, 3, 4, 5];
399        let mut writer = ParxWriter::new();
400        writer.set_source_size(100);
401        writer.set_footer_owned(footer);
402
403        let bytes = writer.finish();
404        assert_eq!(&bytes[0..4], b"PARX");
405    }
406
407    #[test]
408    fn test_writer_with_compression() {
409        let footer = b"test footer data that will be compressed".repeat(100);
410        let mut writer = ParxWriter::new();
411        writer.set_source_size(1000);
412        writer.set_footer(&footer);
413        writer.set_compression(Compression::Zstd);
414
415        let bytes = writer.finish();
416
417        // Check header magic
418        assert_eq!(&bytes[0..4], b"PARX");
419
420        // Verify compression flag is set
421        let header = Header::from_bytes(bytes[..HEADER_SIZE].try_into().unwrap());
422        assert!(header.is_footer_compressed());
423        assert_eq!(header.compression_algorithm(), Some(Compression::Zstd));
424    }
425
426    #[test]
427    fn test_auto_compress() {
428        // Small footer - no compression
429        let mut writer = ParxWriter::new();
430        writer.set_footer(b"small");
431        writer.auto_compress();
432        assert!(writer.compression.is_none());
433
434        // Large footer - auto compression
435        let mut writer = ParxWriter::new();
436        writer.set_footer(&vec![0u8; 20_000]);
437        writer.auto_compress();
438        assert_eq!(writer.compression, Some(Compression::Zstd));
439    }
440
441    #[test]
442    fn test_from_parquet_bytes() {
443        let data = valid_parquet_bytes();
444        let writer = ParxWriter::from_parquet_bytes(&data).unwrap();
445
446        assert_eq!(writer.source_size, data.len() as u64);
447        assert_eq!(writer.footer_bytes, Bytes::from_static(b"abc"));
448    }
449
450    #[test]
451    fn test_from_parquet_bytes_invalid_magic() {
452        let mut data = valid_parquet_bytes();
453        data[0..4].copy_from_slice(b"XXXX");
454
455        let err = ParxWriter::from_parquet_bytes(&data).unwrap_err();
456        assert!(matches!(
457            err,
458            ParxError::InvalidParquetMagic(m) if m == *b"XXXX"
459        ));
460    }
461
462    #[test]
463    fn test_from_parquet_bytes_invalid_footer_length() {
464        let mut data = valid_parquet_bytes();
465        let file_size = data.len() as u64;
466        let data_len = data.len();
467        data[data_len - 8..data_len - 4].copy_from_slice(&(100u32).to_le_bytes());
468
469        let err = ParxWriter::from_parquet_bytes(&data).unwrap_err();
470        assert!(matches!(
471            err,
472            ParxError::InvalidParquetFooterLength {
473                footer_len: 100,
474                file_size: f
475            } if f == file_size
476        ));
477    }
478
479    #[test]
480    fn test_from_parquet_file() {
481        let dir = tempfile::tempdir().unwrap();
482        let path = dir.path().join("data.parquet");
483        let data = valid_parquet_bytes();
484        std::fs::write(&path, data).unwrap();
485
486        let writer = ParxWriter::from_parquet_file(&path).unwrap();
487        assert_eq!(writer.source_uri, path.display().to_string());
488        assert_eq!(writer.footer_bytes, Bytes::from_static(b"abc"));
489    }
490
491    #[test]
492    fn test_from_parquet_file_invalid_footer_length() {
493        let dir = tempfile::tempdir().unwrap();
494        let path = dir.path().join("broken.parquet");
495        let mut data = valid_parquet_bytes();
496        let len = data.len();
497        data[len - 8..len - 4].copy_from_slice(&(100u32).to_le_bytes());
498        std::fs::write(&path, data).unwrap();
499
500        let err = ParxWriter::from_parquet_file(&path).unwrap_err();
501        assert!(matches!(
502            err,
503            ParxError::InvalidParquetFooterLength {
504                footer_len: 100,
505                ..
506            }
507        ));
508    }
509}