pcapsql_core/io/
mmap.rs

1//! Memory-mapped packet source for efficient packet access.
2//!
3//! This module is only available when the `mmap` feature is enabled.
4//!
5//! Uses `memmap2` crate for platform-independent memory mapping.
6//! Provides efficient access to packet data for large files.
7//!
8//! ## Supported Formats
9//!
10//! - Classic PCAP (little/big endian, micro/nanosecond timestamps)
11//! - PCAPNG
12//! - All above formats with compression (gzip, zstd, lz4, bzip2, xz)
13//!
14//! ## Design
15//!
16//! This module uses the unified layer stack:
17//! ```text
18//! MmapPacketReader
19//!     └── GenericPcapReader<DecompressReader<Cursor<MmapSlice>>>
20//!             └── DecompressReader handles compression
21//!                     └── Cursor<MmapSlice> provides Read over mmap
22//! ```
23//!
24//! Benchmarks show this approach (using pcap_parser) is 30% faster than
25//! custom byte parsing while being more maintainable.
26
27use std::fs::File;
28use std::io::Cursor;
29use std::path::{Path, PathBuf};
30use std::sync::Arc;
31
32use memmap2::Mmap;
33
34use crate::error::{Error, PcapError};
35
36use super::decompress::{Compression, DecompressReader, MmapSlice};
37use super::pcap_stream::{GenericPcapReader, PcapFormat};
38use super::{
39    PacketPosition, PacketRange, PacketReader, PacketRef, PacketSource, PacketSourceMetadata,
40};
41
42/// Memory-mapped packet source.
43///
44/// Maps the entire file into virtual memory, allowing the OS to handle
45/// caching and paging. Supports both uncompressed and compressed files.
46#[derive(Clone)]
47pub struct MmapPacketSource {
48    /// Path to the file (for error messages)
49    path: PathBuf,
50    /// Memory-mapped region (shared via Arc)
51    mmap: Arc<Mmap>,
52    /// Cached metadata
53    metadata: PacketSourceMetadata,
54    /// Detected compression format
55    compression: Compression,
56    /// PCAP format (after decompression)
57    pcap_format: PcapFormat,
58}
59
60impl MmapPacketSource {
61    /// Open a PCAP or PCAPNG file with memory mapping.
62    ///
63    /// Automatically detects and handles compression.
64    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
65        let path = path.as_ref().to_path_buf();
66        let file = File::open(&path).map_err(Error::Io)?;
67
68        // Create memory mapping
69        let mmap = unsafe { Mmap::map(&file).map_err(Error::Io)? };
70
71        // Detect compression
72        let compression = Compression::detect(&mmap);
73
74        // Detect PCAP format (may need to decompress first bytes)
75        let (pcap_format, link_type) = if compression.is_compressed() {
76            Self::detect_format_compressed(&mmap, compression)?
77        } else {
78            Self::detect_format_uncompressed(&mmap)?
79        };
80
81        let metadata = PacketSourceMetadata {
82            link_type,
83            snaplen: 65535,
84            size_bytes: Some(mmap.len() as u64),
85            packet_count: None,
86            seekable: !compression.is_compressed(), // Can only seek in uncompressed
87        };
88
89        Ok(Self {
90            path,
91            mmap: Arc::new(mmap),
92            metadata,
93            compression,
94            pcap_format,
95        })
96    }
97
98    /// Detect format from uncompressed data.
99    fn detect_format_uncompressed(data: &[u8]) -> Result<(PcapFormat, u32), Error> {
100        if data.len() < 24 {
101            return Err(Error::Pcap(PcapError::InvalidFormat {
102                reason: "File too small for PCAP header".into(),
103            }));
104        }
105
106        let format = PcapFormat::detect(data)?;
107        let link_type = if format.is_pcapng() {
108            1 // Will be updated from interface description block
109        } else {
110            Self::link_type_from_header(data, &format)
111        };
112
113        Ok((format, link_type))
114    }
115
116    /// Extract link type from legacy PCAP header bytes.
117    fn link_type_from_header(data: &[u8], format: &PcapFormat) -> u32 {
118        if data.len() < 24 {
119            return 1; // Default to Ethernet
120        }
121        // Link type is at offset 20 in PCAP global header
122        let byte_swap = matches!(format, PcapFormat::LegacyBeMicro | PcapFormat::LegacyBeNano);
123        if byte_swap {
124            u32::from_be_bytes([data[20], data[21], data[22], data[23]])
125        } else {
126            u32::from_le_bytes([data[20], data[21], data[22], data[23]])
127        }
128    }
129
130    /// Detect format from compressed data (decompress first bytes).
131    fn detect_format_compressed(
132        data: &[u8],
133        compression: Compression,
134    ) -> Result<(PcapFormat, u32), Error> {
135        use std::io::Read;
136
137        // Decompress enough bytes to detect the format and link type
138        let mut decoder: Box<dyn Read> = match compression {
139            Compression::None => unreachable!(),
140            Compression::Gzip => {
141                let cursor = Cursor::new(data);
142                let gz = flate2::read::GzDecoder::new(cursor);
143                Box::new(gz) as Box<dyn Read>
144            }
145            #[cfg(feature = "compress-zstd")]
146            Compression::Zstd => {
147                let cursor = Cursor::new(data);
148                let zstd = zstd::Decoder::new(cursor)?;
149                Box::new(zstd) as Box<dyn Read>
150            }
151            #[cfg(feature = "compress-lz4")]
152            Compression::Lz4 => {
153                let cursor = Cursor::new(data);
154                let lz4 = lz4_flex::frame::FrameDecoder::new(cursor);
155                Box::new(lz4) as Box<dyn Read>
156            }
157            #[cfg(feature = "compress-bzip2")]
158            Compression::Bzip2 => {
159                let cursor = Cursor::new(data);
160                let bz2 = bzip2::read::BzDecoder::new(cursor);
161                Box::new(bz2) as Box<dyn Read>
162            }
163            #[cfg(feature = "compress-xz")]
164            Compression::Xz => {
165                let cursor = Cursor::new(data);
166                let xz = xz2::read::XzDecoder::new(cursor);
167                Box::new(xz) as Box<dyn Read>
168            }
169        };
170
171        // Read enough bytes to detect format and link type
172        let mut header = [0u8; 24];
173        decoder.read_exact(&mut header).map_err(|e| {
174            Error::Pcap(PcapError::InvalidFormat {
175                reason: format!("Failed to read compressed header: {e}"),
176            })
177        })?;
178
179        let format = PcapFormat::detect(&header)?;
180        let link_type = if format.is_pcapng() {
181            1 // Will be updated during reading
182        } else {
183            Self::link_type_from_header(&header, &format)
184        };
185
186        Ok((format, link_type))
187    }
188
189    /// Get the path to the file.
190    pub fn path(&self) -> &Path {
191        &self.path
192    }
193
194    /// Get the detected compression format.
195    pub fn compression(&self) -> Compression {
196        self.compression
197    }
198
199    /// Get the PCAP format.
200    pub fn pcap_format(&self) -> PcapFormat {
201        self.pcap_format
202    }
203
204    /// Check if this is a PCAPNG file.
205    pub fn is_pcapng(&self) -> bool {
206        self.pcap_format.is_pcapng()
207    }
208
209    /// Check if this file is compressed.
210    pub fn is_compressed(&self) -> bool {
211        self.compression.is_compressed()
212    }
213
214    /// Get the link type.
215    pub fn link_type(&self) -> u32 {
216        self.metadata.link_type
217    }
218}
219
220impl PacketSource for MmapPacketSource {
221    type Reader = MmapPacketReader;
222
223    fn metadata(&self) -> &PacketSourceMetadata {
224        &self.metadata
225    }
226
227    fn reader(&self, range: Option<&PacketRange>) -> Result<Self::Reader, Error> {
228        MmapPacketReader::new(
229            self.mmap.clone(),
230            self.compression,
231            self.pcap_format,
232            self.metadata.link_type,
233            range.cloned(),
234        )
235    }
236
237    fn partitions(&self, _max_partitions: usize) -> Result<Vec<PacketRange>, Error> {
238        // For now, return single partition
239        // Future optimization: scan for packet boundaries at byte offsets
240        Ok(vec![PacketRange::whole()])
241    }
242}
243
244impl std::fmt::Debug for MmapPacketSource {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        f.debug_struct("MmapPacketSource")
247            .field("path", &self.path)
248            .field("size_bytes", &self.metadata.size_bytes)
249            .field("link_type", &self.metadata.link_type)
250            .field("compression", &self.compression)
251            .field("pcap_format", &self.pcap_format)
252            .finish()
253    }
254}
255
256/// Memory-mapped packet reader.
257///
258/// Uses the unified layer stack:
259/// - `GenericPcapReader` for PCAP parsing
260/// - `DecompressReader` for transparent decompression
261/// - `Cursor<MmapSlice>` for reading from mmap
262pub struct MmapPacketReader {
263    /// The unified PCAP reader
264    inner: GenericPcapReader<DecompressReader<Cursor<MmapSlice>>>,
265    /// Link type (may be updated from PCAPNG interface description)
266    link_type: u32,
267    /// Current byte offset (for position tracking)
268    byte_offset: u64,
269    /// Optional range restriction
270    range: Option<PacketRange>,
271}
272
273impl MmapPacketReader {
274    fn new(
275        mmap: Arc<Mmap>,
276        compression: Compression,
277        pcap_format: PcapFormat,
278        link_type: u32,
279        range: Option<PacketRange>,
280    ) -> Result<Self, Error> {
281        // Create the layer stack: Cursor -> DecompressReader -> GenericPcapReader
282        let slice = MmapSlice::new(mmap);
283        let cursor = Cursor::new(slice);
284        let decompress = DecompressReader::new(cursor, compression).map_err(|e| {
285            Error::Pcap(PcapError::InvalidFormat {
286                reason: format!("Failed to create decompressor: {e}"),
287            })
288        })?;
289        let inner = GenericPcapReader::with_format(decompress, pcap_format)?;
290
291        Ok(Self {
292            inner,
293            link_type,
294            byte_offset: 0,
295            range,
296        })
297    }
298
299    /// Check if we've passed the end of our range.
300    #[inline]
301    fn past_range_end(&self) -> bool {
302        if let Some(ref range) = self.range {
303            if let Some(ref end) = range.end {
304                return self.inner.frame_count() >= end.frame_number;
305            }
306        }
307        false
308    }
309}
310
311impl PacketReader for MmapPacketReader {
312    fn process_packets<F>(&mut self, max: usize, mut f: F) -> Result<usize, Error>
313    where
314        F: FnMut(PacketRef<'_>) -> Result<(), Error>,
315    {
316        if self.past_range_end() {
317            return Ok(0);
318        }
319
320        // Calculate how many packets we can process before hitting range end
321        let effective_max = if let Some(ref range) = self.range {
322            if let Some(ref end) = range.end {
323                let remaining = end.frame_number.saturating_sub(self.inner.frame_count());
324                max.min(remaining as usize)
325            } else {
326                max
327            }
328        } else {
329            max
330        };
331
332        let count = self.inner.process_packets(effective_max, &mut f)?;
333
334        // Update link type from reader (may have been updated from PCAPNG IDB)
335        self.link_type = self.inner.link_type();
336
337        Ok(count)
338    }
339
340    fn position(&self) -> PacketPosition {
341        PacketPosition {
342            byte_offset: self.byte_offset,
343            frame_number: self.inner.frame_count(),
344        }
345    }
346
347    fn link_type(&self) -> u32 {
348        self.link_type
349    }
350}
351
352// Implement Unpin for async compatibility
353impl Unpin for MmapPacketReader {}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use std::path::PathBuf;
359
360    fn test_pcap_path(name: &str) -> PathBuf {
361        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
362            .join("testdata")
363            .join("corpus")
364            .join(name)
365    }
366
367    #[test]
368    fn test_mmap_source_opens() {
369        let path = test_pcap_path("dns.cap");
370        if !path.exists() {
371            return; // Skip if test file doesn't exist
372        }
373
374        let source = MmapPacketSource::open(&path);
375        assert!(source.is_ok(), "Failed to open: {:?}", source.err());
376    }
377
378    #[test]
379    fn test_mmap_reader_reads_packets() {
380        let path = test_pcap_path("dns.cap");
381        if !path.exists() {
382            return;
383        }
384
385        let source = MmapPacketSource::open(&path).unwrap();
386        let mut reader = source.reader(None).unwrap();
387
388        let mut found_packet = false;
389        reader
390            .process_packets(1, |packet| {
391                assert_eq!(packet.frame_number, 1);
392                assert!(!packet.data.is_empty());
393                found_packet = true;
394                Ok(())
395            })
396            .unwrap();
397        assert!(found_packet);
398    }
399
400    #[test]
401    fn test_mmap_reader_counts_match_file() {
402        let path = test_pcap_path("dns.cap");
403        if !path.exists() {
404            return;
405        }
406
407        let source = MmapPacketSource::open(&path).unwrap();
408        let mut reader = source.reader(None).unwrap();
409
410        let mut count = 0;
411        loop {
412            let processed = reader
413                .process_packets(100, |_| {
414                    count += 1;
415                    Ok(())
416                })
417                .unwrap();
418            if processed == 0 {
419                break;
420            }
421        }
422
423        // dns.cap has 38 packets
424        assert!(count > 0, "Should have read some packets");
425    }
426
427    #[test]
428    fn test_mmap_position_tracking() {
429        let path = test_pcap_path("dns.cap");
430        if !path.exists() {
431            return;
432        }
433
434        let source = MmapPacketSource::open(&path).unwrap();
435        let mut reader = source.reader(None).unwrap();
436
437        let pos1 = reader.position();
438        assert_eq!(pos1.frame_number, 0);
439
440        reader.process_packets(1, |_| Ok(())).unwrap();
441        let pos2 = reader.position();
442        assert_eq!(pos2.frame_number, 1);
443    }
444
445    #[test]
446    fn test_mmap_link_type() {
447        let path = test_pcap_path("dns.cap");
448        if !path.exists() {
449            return;
450        }
451        let source = MmapPacketSource::open(&path).unwrap();
452        // dns.cap should have Ethernet link type (1)
453        assert_eq!(source.metadata().link_type, 1);
454    }
455
456    #[test]
457    fn test_mmap_netlink_link_type() {
458        let path = test_pcap_path("nlmon-big.pcap");
459        if !path.exists() {
460            eprintln!("Skipping test - nlmon-big.pcap not found");
461            return;
462        }
463
464        let source = MmapPacketSource::open(&path).unwrap();
465        // nlmon-big.pcap should have NETLINK link type (253)
466        assert_eq!(
467            source.link_type(),
468            253,
469            "Expected LINKTYPE_NETLINK (253), got {}",
470            source.link_type()
471        );
472    }
473
474    #[test]
475    fn test_mmap_debug_format() {
476        let path = test_pcap_path("dns.cap");
477        if !path.exists() {
478            return;
479        }
480
481        let source = MmapPacketSource::open(&path).unwrap();
482        let debug_str = format!("{:?}", source);
483        assert!(debug_str.contains("MmapPacketSource"));
484        assert!(debug_str.contains("dns.cap"));
485    }
486
487    #[test]
488    fn test_mmap_packet_data_integrity() {
489        let path = test_pcap_path("dns.cap");
490        if !path.exists() {
491            return;
492        }
493
494        let source = MmapPacketSource::open(&path).unwrap();
495        let mut reader = source.reader(None).unwrap();
496
497        // Read first packet and validate Ethernet header structure
498        reader
499            .process_packets(1, |packet| {
500                // Minimum Ethernet frame is 14 bytes (MAC dst + MAC src + EtherType)
501                assert!(packet.data.len() >= 14, "Packet too small for Ethernet");
502
503                // captured_len should match data length
504                assert_eq!(packet.captured_len as usize, packet.data.len());
505
506                // original_len should be >= captured_len
507                assert!(packet.original_len >= packet.captured_len);
508
509                // Frame number should start at 1
510                assert_eq!(packet.frame_number, 1);
511
512                // Timestamp should be positive (after Unix epoch)
513                assert!(packet.timestamp_us > 0);
514                Ok(())
515            })
516            .unwrap();
517    }
518
519    #[test]
520    fn test_mmap_all_packets_valid() {
521        let path = test_pcap_path("dns.cap");
522        if !path.exists() {
523            return;
524        }
525
526        let source = MmapPacketSource::open(&path).unwrap();
527        let mut reader = source.reader(None).unwrap();
528
529        let mut prev_frame = 0u64;
530        let mut count = 0;
531
532        loop {
533            let processed = reader
534                .process_packets(100, |packet| {
535                    // Frame numbers should be sequential
536                    assert_eq!(packet.frame_number, prev_frame + 1);
537                    prev_frame = packet.frame_number;
538
539                    // Basic sanity checks
540                    assert!(packet.data.len() <= 65535, "Packet exceeds max size");
541                    assert_eq!(packet.captured_len as usize, packet.data.len());
542
543                    count += 1;
544                    Ok(())
545                })
546                .unwrap();
547            if processed == 0 {
548                break;
549            }
550        }
551
552        assert!(count > 0, "Should have read packets");
553    }
554
555    #[test]
556    fn test_mmap_timestamp_reasonableness() {
557        let path = test_pcap_path("dns.cap");
558        if !path.exists() {
559            return;
560        }
561
562        let source = MmapPacketSource::open(&path).unwrap();
563        let mut reader = source.reader(None).unwrap();
564
565        let mut prev_timestamp = 0i64;
566
567        loop {
568            let processed = reader
569                .process_packets(100, |packet| {
570                    // Timestamps should be non-negative
571                    assert!(packet.timestamp_us >= 0);
572
573                    // Timestamps should generally not go backwards (within reason)
574                    // Allow small backward jumps for clock drift
575                    if prev_timestamp > 0 {
576                        let diff = packet.timestamp_us - prev_timestamp;
577                        assert!(
578                            diff >= -1_000_000, // Allow up to 1 second backward
579                            "Timestamp went backwards by {} us",
580                            diff.abs()
581                        );
582                    }
583                    prev_timestamp = packet.timestamp_us;
584                    Ok(())
585                })
586                .unwrap();
587            if processed == 0 {
588                break;
589            }
590        }
591    }
592
593    #[test]
594    fn test_mmap_range_reading() {
595        let path = test_pcap_path("dns.cap");
596        if !path.exists() {
597            return;
598        }
599
600        let source = MmapPacketSource::open(&path).unwrap();
601
602        // Create a range ending at frame 10
603        let range = PacketRange {
604            start: PacketPosition {
605                byte_offset: 0,
606                frame_number: 0,
607            },
608            end: Some(PacketPosition {
609                byte_offset: 0,
610                frame_number: 10,
611            }),
612        };
613
614        let mut reader = source.reader(Some(&range)).unwrap();
615
616        let mut count = 0;
617        loop {
618            let processed = reader
619                .process_packets(100, |_| {
620                    count += 1;
621                    Ok(())
622                })
623                .unwrap();
624            if processed == 0 || count > 100 {
625                break; // EOF or safety limit
626            }
627        }
628
629        // Should have read frames 1-10 (10 frames, since range.end is exclusive)
630        assert!(count <= 10, "Should respect range end limit, got {}", count);
631    }
632
633    #[test]
634    fn test_mmap_metadata() {
635        let path = test_pcap_path("dns.cap");
636        if !path.exists() {
637            return;
638        }
639
640        let source = MmapPacketSource::open(&path).unwrap();
641        let meta = source.metadata();
642
643        assert_eq!(meta.link_type, 1); // Ethernet
644        assert!(meta.size_bytes.is_some());
645        assert!(meta.size_bytes.unwrap() > 0);
646    }
647
648    #[test]
649    fn test_mmap_clone() {
650        let path = test_pcap_path("dns.cap");
651        if !path.exists() {
652            return;
653        }
654
655        let source1 = MmapPacketSource::open(&path).unwrap();
656        let source2 = source1.clone();
657
658        // Both sources should work independently
659        let mut reader1 = source1.reader(None).unwrap();
660        let mut reader2 = source2.reader(None).unwrap();
661
662        let mut frame1 = 0u64;
663        let mut len1 = 0usize;
664        let mut frame2 = 0u64;
665        let mut len2 = 0usize;
666
667        reader1
668            .process_packets(1, |p| {
669                frame1 = p.frame_number;
670                len1 = p.data.len();
671                Ok(())
672            })
673            .unwrap();
674        reader2
675            .process_packets(1, |p| {
676                frame2 = p.frame_number;
677                len2 = p.data.len();
678                Ok(())
679            })
680            .unwrap();
681
682        // Should read identical first packets
683        assert_eq!(frame1, frame2);
684        assert_eq!(len1, len2);
685    }
686
687    #[test]
688    fn test_mmap_partitions() {
689        let path = test_pcap_path("dns.cap");
690        if !path.exists() {
691            return;
692        }
693
694        let source = MmapPacketSource::open(&path).unwrap();
695        let partitions = source.partitions(4).unwrap();
696
697        // Currently returns single partition
698        assert_eq!(partitions.len(), 1);
699    }
700}