pcapsql_core/io/
decompress.rs

1//! Compression detection and decompression support.
2//!
3//! Provides a unified `DecompressReader<R>` that wraps various decompression formats
4//! and implements `Read`. Uses enum dispatch for zero-allocation decompression.
5//!
6//! The reader is generic over any `R: Read`, allowing it to work with:
7//! - `File` for standard file I/O
8//! - `Cursor<MmapSlice>` for memory-mapped files
9//! - Any other `Read` source (e.g., network streams, S3)
10
11#[cfg(feature = "compress-zstd")]
12use std::io::BufReader;
13use std::io::{self, Read};
14#[cfg(feature = "mmap")]
15use std::sync::Arc;
16
17use flate2::read::GzDecoder;
18#[cfg(feature = "mmap")]
19use memmap2::Mmap;
20
21/// Detected compression format.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum Compression {
24    /// No compression
25    None,
26    /// Gzip (.gz)
27    Gzip,
28    /// Zstandard (.zst)
29    #[cfg(feature = "compress-zstd")]
30    Zstd,
31    /// LZ4 frame format (.lz4)
32    #[cfg(feature = "compress-lz4")]
33    Lz4,
34    /// Bzip2 (.bz2)
35    #[cfg(feature = "compress-bzip2")]
36    Bzip2,
37    /// XZ/LZMA (.xz)
38    #[cfg(feature = "compress-xz")]
39    Xz,
40}
41
42impl Compression {
43    /// Detect compression format from magic bytes.
44    pub fn detect(data: &[u8]) -> Self {
45        if data.len() < 6 {
46            return Compression::None;
47        }
48
49        // Check magic bytes for each format
50        match data {
51            // Gzip: 1f 8b
52            [0x1f, 0x8b, ..] => Compression::Gzip,
53
54            // Zstd: 28 b5 2f fd
55            #[cfg(feature = "compress-zstd")]
56            [0x28, 0xb5, 0x2f, 0xfd, ..] => Compression::Zstd,
57
58            // LZ4 frame: 04 22 4d 18
59            #[cfg(feature = "compress-lz4")]
60            [0x04, 0x22, 0x4d, 0x18, ..] => Compression::Lz4,
61
62            // Bzip2: 42 5a ("BZ")
63            #[cfg(feature = "compress-bzip2")]
64            [0x42, 0x5a, ..] => Compression::Bzip2,
65
66            // XZ: fd 37 7a 58 5a 00
67            #[cfg(feature = "compress-xz")]
68            [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00, ..] => Compression::Xz,
69
70            _ => Compression::None,
71        }
72    }
73
74    /// Get the typical file extension for this compression format.
75    pub fn extension(&self) -> Option<&'static str> {
76        match self {
77            Compression::None => None,
78            Compression::Gzip => Some("gz"),
79            #[cfg(feature = "compress-zstd")]
80            Compression::Zstd => Some("zst"),
81            #[cfg(feature = "compress-lz4")]
82            Compression::Lz4 => Some("lz4"),
83            #[cfg(feature = "compress-bzip2")]
84            Compression::Bzip2 => Some("bz2"),
85            #[cfg(feature = "compress-xz")]
86            Compression::Xz => Some("xz"),
87        }
88    }
89
90    /// Check if this represents compressed data.
91    pub fn is_compressed(&self) -> bool {
92        !matches!(self, Compression::None)
93    }
94}
95
96impl std::fmt::Display for Compression {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match self {
99            Compression::None => write!(f, "none"),
100            Compression::Gzip => write!(f, "gzip"),
101            #[cfg(feature = "compress-zstd")]
102            Compression::Zstd => write!(f, "zstd"),
103            #[cfg(feature = "compress-lz4")]
104            Compression::Lz4 => write!(f, "lz4"),
105            #[cfg(feature = "compress-bzip2")]
106            Compression::Bzip2 => write!(f, "bzip2"),
107            #[cfg(feature = "compress-xz")]
108            Compression::Xz => write!(f, "xz"),
109        }
110    }
111}
112
113/// A wrapper that allows sharing mmap data with Cursor.
114/// Implements AsRef<[u8]> so it can be used as Cursor's inner type.
115///
116/// This type is only available when the `mmap` feature is enabled.
117#[cfg(feature = "mmap")]
118#[derive(Clone)]
119pub struct MmapSlice {
120    mmap: Arc<Mmap>,
121    start: usize,
122}
123
124#[cfg(feature = "mmap")]
125impl MmapSlice {
126    /// Create a new MmapSlice from an `Arc<Mmap>`.
127    pub fn new(mmap: Arc<Mmap>) -> Self {
128        Self { mmap, start: 0 }
129    }
130
131    /// Create a new MmapSlice starting at a given offset.
132    pub fn with_offset(mmap: Arc<Mmap>, start: usize) -> Self {
133        Self { mmap, start }
134    }
135}
136
137#[cfg(feature = "mmap")]
138impl AsRef<[u8]> for MmapSlice {
139    fn as_ref(&self) -> &[u8] {
140        &self.mmap[self.start..]
141    }
142}
143
144/// Unified decompression reader that wraps various decompression formats.
145///
146/// Generic over any `R: Read`, enabling composition with:
147/// - `File` for standard file I/O
148/// - `Cursor<MmapSlice>` for memory-mapped files
149/// - Any other `Read` source
150///
151/// Uses enum dispatch rather than trait objects to avoid allocation
152/// and enable potential inlining. The `Read` implementation simply
153/// delegates to the inner decoder.
154pub enum DecompressReader<R: Read> {
155    /// No compression - pass-through
156    None(R),
157
158    /// Gzip decompression
159    Gzip(GzDecoder<R>),
160
161    /// Zstandard decompression
162    #[cfg(feature = "compress-zstd")]
163    Zstd(zstd::Decoder<'static, BufReader<R>>),
164
165    /// LZ4 frame decompression
166    #[cfg(feature = "compress-lz4")]
167    Lz4(lz4_flex::frame::FrameDecoder<R>),
168
169    /// Bzip2 decompression
170    #[cfg(feature = "compress-bzip2")]
171    Bzip2(bzip2::read::BzDecoder<R>),
172
173    /// XZ/LZMA decompression
174    #[cfg(feature = "compress-xz")]
175    Xz(xz2::read::XzDecoder<R>),
176}
177
178impl<R: Read> DecompressReader<R> {
179    /// Create a decompression reader with explicit compression format.
180    pub fn new(source: R, compression: Compression) -> io::Result<Self> {
181        match compression {
182            Compression::None => Ok(DecompressReader::None(source)),
183
184            Compression::Gzip => Ok(DecompressReader::Gzip(GzDecoder::new(source))),
185
186            #[cfg(feature = "compress-zstd")]
187            Compression::Zstd => {
188                let decoder = zstd::Decoder::new(source)?;
189                Ok(DecompressReader::Zstd(decoder))
190            }
191
192            #[cfg(feature = "compress-lz4")]
193            Compression::Lz4 => {
194                let decoder = lz4_flex::frame::FrameDecoder::new(source);
195                Ok(DecompressReader::Lz4(decoder))
196            }
197
198            #[cfg(feature = "compress-bzip2")]
199            Compression::Bzip2 => {
200                let decoder = bzip2::read::BzDecoder::new(source);
201                Ok(DecompressReader::Bzip2(decoder))
202            }
203
204            #[cfg(feature = "compress-xz")]
205            Compression::Xz => {
206                let decoder = xz2::read::XzDecoder::new(source);
207                Ok(DecompressReader::Xz(decoder))
208            }
209        }
210    }
211
212    /// Get the compression format this reader handles.
213    pub fn compression(&self) -> Compression {
214        match self {
215            DecompressReader::None(_) => Compression::None,
216            DecompressReader::Gzip(_) => Compression::Gzip,
217            #[cfg(feature = "compress-zstd")]
218            DecompressReader::Zstd(_) => Compression::Zstd,
219            #[cfg(feature = "compress-lz4")]
220            DecompressReader::Lz4(_) => Compression::Lz4,
221            #[cfg(feature = "compress-bzip2")]
222            DecompressReader::Bzip2(_) => Compression::Bzip2,
223            #[cfg(feature = "compress-xz")]
224            DecompressReader::Xz(_) => Compression::Xz,
225        }
226    }
227}
228
229impl<R: Read> Read for DecompressReader<R> {
230    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
231        match self {
232            DecompressReader::None(r) => r.read(buf),
233            DecompressReader::Gzip(r) => r.read(buf),
234            #[cfg(feature = "compress-zstd")]
235            DecompressReader::Zstd(r) => r.read(buf),
236            #[cfg(feature = "compress-lz4")]
237            DecompressReader::Lz4(r) => r.read(buf),
238            #[cfg(feature = "compress-bzip2")]
239            DecompressReader::Bzip2(r) => r.read(buf),
240            #[cfg(feature = "compress-xz")]
241            DecompressReader::Xz(r) => r.read(buf),
242        }
243    }
244}
245
246// DecompressReader is Send when R is Send
247unsafe impl<R: Read + Send> Send for DecompressReader<R> {}
248
249// Required for async compatibility
250impl<R: Read + Unpin> Unpin for DecompressReader<R> {}
251
252// =============================================================================
253// In-memory decompression for header detection
254// =============================================================================
255
256use std::io::Cursor;
257
258/// Decompress a header buffer in memory.
259///
260/// This is used during cloud source initialization to extract the PCAP header
261/// from compressed data without making additional HTTP requests.
262///
263/// # Arguments
264/// * `compressed` - Compressed bytes (e.g., first 64KB from cloud object)
265/// * `compression` - Detected compression format
266/// * `output_size` - Maximum bytes to decompress (e.g., 1024 for PCAP header)
267///
268/// # Returns
269/// Decompressed bytes, up to `output_size` bytes.
270pub fn decompress_header(
271    compressed: &[u8],
272    compression: Compression,
273    output_size: usize,
274) -> io::Result<Vec<u8>> {
275    if !compression.is_compressed() {
276        // No compression - just return a copy of the data
277        let len = compressed.len().min(output_size);
278        return Ok(compressed[..len].to_vec());
279    }
280
281    // Create a cursor over the compressed data
282    let cursor = Cursor::new(compressed);
283
284    // Create the appropriate decoder
285    let mut decoder = DecompressReader::new(cursor, compression)?;
286
287    // Read up to output_size bytes
288    let mut output = vec![0u8; output_size];
289    let mut total_read = 0;
290
291    while total_read < output_size {
292        match decoder.read(&mut output[total_read..]) {
293            Ok(0) => break, // EOF
294            Ok(n) => total_read += n,
295            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
296            Err(e) => return Err(e),
297        }
298    }
299
300    output.truncate(total_read);
301    Ok(output)
302}
303
304// =============================================================================
305// Type aliases for convenience
306// =============================================================================
307
308use std::fs::File;
309
310/// Type alias for file-based decompression.
311pub type FileDecoder = DecompressReader<File>;
312
313/// Type alias for mmap-based decompression.
314///
315/// Only available when the `mmap` feature is enabled.
316#[cfg(feature = "mmap")]
317pub type AnyDecoder = DecompressReader<Cursor<MmapSlice>>;
318
319#[cfg(feature = "mmap")]
320impl AnyDecoder {
321    /// Create a decoder for the given mmap'd data.
322    ///
323    /// Automatically detects compression format from magic bytes.
324    pub fn from_mmap(mmap: Arc<Mmap>) -> io::Result<Self> {
325        let compression = Compression::detect(&mmap);
326        Self::with_compression_mmap(mmap, compression)
327    }
328
329    /// Create a decoder with explicit compression format.
330    pub fn with_compression_mmap(mmap: Arc<Mmap>, compression: Compression) -> io::Result<Self> {
331        let slice = MmapSlice::new(mmap);
332        let cursor = Cursor::new(slice);
333        DecompressReader::new(cursor, compression)
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn test_detect_no_compression() {
343        // PCAP magic
344        let data = [0xd4, 0xc3, 0xb2, 0xa1, 0x00, 0x00];
345        assert_eq!(Compression::detect(&data), Compression::None);
346    }
347
348    #[test]
349    fn test_detect_gzip() {
350        let data = [0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00];
351        assert_eq!(Compression::detect(&data), Compression::Gzip);
352    }
353
354    #[cfg(feature = "compress-zstd")]
355    #[test]
356    fn test_detect_zstd() {
357        let data = [0x28, 0xb5, 0x2f, 0xfd, 0x00, 0x00];
358        assert_eq!(Compression::detect(&data), Compression::Zstd);
359    }
360
361    #[cfg(feature = "compress-lz4")]
362    #[test]
363    fn test_detect_lz4() {
364        let data = [0x04, 0x22, 0x4d, 0x18, 0x00, 0x00];
365        assert_eq!(Compression::detect(&data), Compression::Lz4);
366    }
367
368    #[cfg(feature = "compress-bzip2")]
369    #[test]
370    fn test_detect_bzip2() {
371        let data = [0x42, 0x5a, 0x68, 0x39, 0x00, 0x00];
372        assert_eq!(Compression::detect(&data), Compression::Bzip2);
373    }
374
375    #[cfg(feature = "compress-xz")]
376    #[test]
377    fn test_detect_xz() {
378        let data = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00];
379        assert_eq!(Compression::detect(&data), Compression::Xz);
380    }
381
382    #[test]
383    fn test_detect_short_data() {
384        let data = [0x1f, 0x8b]; // Too short
385        assert_eq!(Compression::detect(&data), Compression::None);
386    }
387
388    #[test]
389    fn test_compression_display() {
390        assert_eq!(format!("{}", Compression::None), "none");
391        assert_eq!(format!("{}", Compression::Gzip), "gzip");
392    }
393
394    #[test]
395    fn test_compression_extension() {
396        assert_eq!(Compression::None.extension(), None);
397        assert_eq!(Compression::Gzip.extension(), Some("gz"));
398    }
399
400    #[test]
401    fn test_compression_is_compressed() {
402        assert!(!Compression::None.is_compressed());
403        assert!(Compression::Gzip.is_compressed());
404    }
405
406    #[test]
407    fn test_decompress_header_no_compression() {
408        let data = vec![0xd4, 0xc3, 0xb2, 0xa1, 0x00, 0x02, 0x00, 0x04];
409        let result = super::decompress_header(&data, Compression::None, 100).unwrap();
410        assert_eq!(result, data);
411    }
412
413    #[test]
414    fn test_decompress_header_no_compression_truncated() {
415        let data = vec![0xd4, 0xc3, 0xb2, 0xa1, 0x00, 0x02, 0x00, 0x04];
416        // Request less than available
417        let result = super::decompress_header(&data, Compression::None, 4).unwrap();
418        assert_eq!(result, vec![0xd4, 0xc3, 0xb2, 0xa1]);
419    }
420
421    #[test]
422    fn test_decompress_header_gzip() {
423        // PCAP header compressed with gzip
424        // This is a minimal gzip stream containing the PCAP magic bytes
425        use std::io::Write;
426        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
427        encoder
428            .write_all(&[0xd4, 0xc3, 0xb2, 0xa1, 0x00, 0x02, 0x00, 0x04])
429            .unwrap();
430        let compressed = encoder.finish().unwrap();
431
432        let result = super::decompress_header(&compressed, Compression::Gzip, 100).unwrap();
433        assert_eq!(result, vec![0xd4, 0xc3, 0xb2, 0xa1, 0x00, 0x02, 0x00, 0x04]);
434    }
435
436    #[test]
437    fn test_decompress_header_gzip_partial() {
438        // Request only first 4 bytes
439        use std::io::Write;
440        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
441        encoder
442            .write_all(&[0xd4, 0xc3, 0xb2, 0xa1, 0x00, 0x02, 0x00, 0x04])
443            .unwrap();
444        let compressed = encoder.finish().unwrap();
445
446        let result = super::decompress_header(&compressed, Compression::Gzip, 4).unwrap();
447        assert_eq!(result, vec![0xd4, 0xc3, 0xb2, 0xa1]);
448    }
449
450    #[cfg(feature = "compress-zstd")]
451    #[test]
452    fn test_decompress_header_zstd() {
453        let data = vec![0xd4, 0xc3, 0xb2, 0xa1, 0x00, 0x02, 0x00, 0x04];
454        let compressed = zstd::encode_all(data.as_slice(), 3).unwrap();
455
456        let result = super::decompress_header(&compressed, Compression::Zstd, 100).unwrap();
457        assert_eq!(result, data);
458    }
459}