1use std::fs::File;
28use std::io::Cursor;
29use std::path::{Path, PathBuf};
30use std::sync::Arc;
31
32use memmap2::Mmap;
33
34use crate::error::{Error, PcapError};
35
36use super::decompress::{Compression, DecompressReader, MmapSlice};
37use super::pcap_stream::{GenericPcapReader, PcapFormat};
38use super::{
39 PacketPosition, PacketRange, PacketReader, PacketRef, PacketSource, PacketSourceMetadata,
40};
41
42#[derive(Clone)]
47pub struct MmapPacketSource {
48 path: PathBuf,
50 mmap: Arc<Mmap>,
52 metadata: PacketSourceMetadata,
54 compression: Compression,
56 pcap_format: PcapFormat,
58}
59
60impl MmapPacketSource {
61 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
65 let path = path.as_ref().to_path_buf();
66 let file = File::open(&path).map_err(Error::Io)?;
67
68 let mmap = unsafe { Mmap::map(&file).map_err(Error::Io)? };
70
71 let compression = Compression::detect(&mmap);
73
74 let (pcap_format, link_type) = if compression.is_compressed() {
76 Self::detect_format_compressed(&mmap, compression)?
77 } else {
78 Self::detect_format_uncompressed(&mmap)?
79 };
80
81 let metadata = PacketSourceMetadata {
82 link_type,
83 snaplen: 65535,
84 size_bytes: Some(mmap.len() as u64),
85 packet_count: None,
86 seekable: !compression.is_compressed(), };
88
89 Ok(Self {
90 path,
91 mmap: Arc::new(mmap),
92 metadata,
93 compression,
94 pcap_format,
95 })
96 }
97
98 fn detect_format_uncompressed(data: &[u8]) -> Result<(PcapFormat, u32), Error> {
100 if data.len() < 24 {
101 return Err(Error::Pcap(PcapError::InvalidFormat {
102 reason: "File too small for PCAP header".into(),
103 }));
104 }
105
106 let format = PcapFormat::detect(data)?;
107 let link_type = if format.is_pcapng() {
108 1 } else {
110 Self::link_type_from_header(data, &format)
111 };
112
113 Ok((format, link_type))
114 }
115
116 fn link_type_from_header(data: &[u8], format: &PcapFormat) -> u32 {
118 if data.len() < 24 {
119 return 1; }
121 let byte_swap = matches!(format, PcapFormat::LegacyBeMicro | PcapFormat::LegacyBeNano);
123 if byte_swap {
124 u32::from_be_bytes([data[20], data[21], data[22], data[23]])
125 } else {
126 u32::from_le_bytes([data[20], data[21], data[22], data[23]])
127 }
128 }
129
130 fn detect_format_compressed(
132 data: &[u8],
133 compression: Compression,
134 ) -> Result<(PcapFormat, u32), Error> {
135 use std::io::Read;
136
137 let mut decoder: Box<dyn Read> = match compression {
139 Compression::None => unreachable!(),
140 Compression::Gzip => {
141 let cursor = Cursor::new(data);
142 let gz = flate2::read::GzDecoder::new(cursor);
143 Box::new(gz) as Box<dyn Read>
144 }
145 #[cfg(feature = "compress-zstd")]
146 Compression::Zstd => {
147 let cursor = Cursor::new(data);
148 let zstd = zstd::Decoder::new(cursor)?;
149 Box::new(zstd) as Box<dyn Read>
150 }
151 #[cfg(feature = "compress-lz4")]
152 Compression::Lz4 => {
153 let cursor = Cursor::new(data);
154 let lz4 = lz4_flex::frame::FrameDecoder::new(cursor);
155 Box::new(lz4) as Box<dyn Read>
156 }
157 #[cfg(feature = "compress-bzip2")]
158 Compression::Bzip2 => {
159 let cursor = Cursor::new(data);
160 let bz2 = bzip2::read::BzDecoder::new(cursor);
161 Box::new(bz2) as Box<dyn Read>
162 }
163 #[cfg(feature = "compress-xz")]
164 Compression::Xz => {
165 let cursor = Cursor::new(data);
166 let xz = xz2::read::XzDecoder::new(cursor);
167 Box::new(xz) as Box<dyn Read>
168 }
169 };
170
171 let mut header = [0u8; 24];
173 decoder.read_exact(&mut header).map_err(|e| {
174 Error::Pcap(PcapError::InvalidFormat {
175 reason: format!("Failed to read compressed header: {e}"),
176 })
177 })?;
178
179 let format = PcapFormat::detect(&header)?;
180 let link_type = if format.is_pcapng() {
181 1 } else {
183 Self::link_type_from_header(&header, &format)
184 };
185
186 Ok((format, link_type))
187 }
188
189 pub fn path(&self) -> &Path {
191 &self.path
192 }
193
194 pub fn compression(&self) -> Compression {
196 self.compression
197 }
198
199 pub fn pcap_format(&self) -> PcapFormat {
201 self.pcap_format
202 }
203
204 pub fn is_pcapng(&self) -> bool {
206 self.pcap_format.is_pcapng()
207 }
208
209 pub fn is_compressed(&self) -> bool {
211 self.compression.is_compressed()
212 }
213
214 pub fn link_type(&self) -> u32 {
216 self.metadata.link_type
217 }
218}
219
220impl PacketSource for MmapPacketSource {
221 type Reader = MmapPacketReader;
222
223 fn metadata(&self) -> &PacketSourceMetadata {
224 &self.metadata
225 }
226
227 fn reader(&self, range: Option<&PacketRange>) -> Result<Self::Reader, Error> {
228 MmapPacketReader::new(
229 self.mmap.clone(),
230 self.compression,
231 self.pcap_format,
232 self.metadata.link_type,
233 range.cloned(),
234 )
235 }
236
237 fn partitions(&self, _max_partitions: usize) -> Result<Vec<PacketRange>, Error> {
238 Ok(vec![PacketRange::whole()])
241 }
242}
243
244impl std::fmt::Debug for MmapPacketSource {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 f.debug_struct("MmapPacketSource")
247 .field("path", &self.path)
248 .field("size_bytes", &self.metadata.size_bytes)
249 .field("link_type", &self.metadata.link_type)
250 .field("compression", &self.compression)
251 .field("pcap_format", &self.pcap_format)
252 .finish()
253 }
254}
255
256pub struct MmapPacketReader {
263 inner: GenericPcapReader<DecompressReader<Cursor<MmapSlice>>>,
265 link_type: u32,
267 byte_offset: u64,
269 range: Option<PacketRange>,
271}
272
273impl MmapPacketReader {
274 fn new(
275 mmap: Arc<Mmap>,
276 compression: Compression,
277 pcap_format: PcapFormat,
278 link_type: u32,
279 range: Option<PacketRange>,
280 ) -> Result<Self, Error> {
281 let slice = MmapSlice::new(mmap);
283 let cursor = Cursor::new(slice);
284 let decompress = DecompressReader::new(cursor, compression).map_err(|e| {
285 Error::Pcap(PcapError::InvalidFormat {
286 reason: format!("Failed to create decompressor: {e}"),
287 })
288 })?;
289 let inner = GenericPcapReader::with_format(decompress, pcap_format)?;
290
291 Ok(Self {
292 inner,
293 link_type,
294 byte_offset: 0,
295 range,
296 })
297 }
298
299 #[inline]
301 fn past_range_end(&self) -> bool {
302 if let Some(ref range) = self.range {
303 if let Some(ref end) = range.end {
304 return self.inner.frame_count() >= end.frame_number;
305 }
306 }
307 false
308 }
309}
310
311impl PacketReader for MmapPacketReader {
312 fn process_packets<F>(&mut self, max: usize, mut f: F) -> Result<usize, Error>
313 where
314 F: FnMut(PacketRef<'_>) -> Result<(), Error>,
315 {
316 if self.past_range_end() {
317 return Ok(0);
318 }
319
320 let effective_max = if let Some(ref range) = self.range {
322 if let Some(ref end) = range.end {
323 let remaining = end.frame_number.saturating_sub(self.inner.frame_count());
324 max.min(remaining as usize)
325 } else {
326 max
327 }
328 } else {
329 max
330 };
331
332 let count = self.inner.process_packets(effective_max, &mut f)?;
333
334 self.link_type = self.inner.link_type();
336
337 Ok(count)
338 }
339
340 fn position(&self) -> PacketPosition {
341 PacketPosition {
342 byte_offset: self.byte_offset,
343 frame_number: self.inner.frame_count(),
344 }
345 }
346
347 fn link_type(&self) -> u32 {
348 self.link_type
349 }
350}
351
352impl Unpin for MmapPacketReader {}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use std::path::PathBuf;
359
360 fn test_pcap_path(name: &str) -> PathBuf {
361 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
362 .join("testdata")
363 .join("corpus")
364 .join(name)
365 }
366
367 #[test]
368 fn test_mmap_source_opens() {
369 let path = test_pcap_path("dns.cap");
370 if !path.exists() {
371 return; }
373
374 let source = MmapPacketSource::open(&path);
375 assert!(source.is_ok(), "Failed to open: {:?}", source.err());
376 }
377
378 #[test]
379 fn test_mmap_reader_reads_packets() {
380 let path = test_pcap_path("dns.cap");
381 if !path.exists() {
382 return;
383 }
384
385 let source = MmapPacketSource::open(&path).unwrap();
386 let mut reader = source.reader(None).unwrap();
387
388 let mut found_packet = false;
389 reader
390 .process_packets(1, |packet| {
391 assert_eq!(packet.frame_number, 1);
392 assert!(!packet.data.is_empty());
393 found_packet = true;
394 Ok(())
395 })
396 .unwrap();
397 assert!(found_packet);
398 }
399
400 #[test]
401 fn test_mmap_reader_counts_match_file() {
402 let path = test_pcap_path("dns.cap");
403 if !path.exists() {
404 return;
405 }
406
407 let source = MmapPacketSource::open(&path).unwrap();
408 let mut reader = source.reader(None).unwrap();
409
410 let mut count = 0;
411 loop {
412 let processed = reader
413 .process_packets(100, |_| {
414 count += 1;
415 Ok(())
416 })
417 .unwrap();
418 if processed == 0 {
419 break;
420 }
421 }
422
423 assert!(count > 0, "Should have read some packets");
425 }
426
427 #[test]
428 fn test_mmap_position_tracking() {
429 let path = test_pcap_path("dns.cap");
430 if !path.exists() {
431 return;
432 }
433
434 let source = MmapPacketSource::open(&path).unwrap();
435 let mut reader = source.reader(None).unwrap();
436
437 let pos1 = reader.position();
438 assert_eq!(pos1.frame_number, 0);
439
440 reader.process_packets(1, |_| Ok(())).unwrap();
441 let pos2 = reader.position();
442 assert_eq!(pos2.frame_number, 1);
443 }
444
445 #[test]
446 fn test_mmap_link_type() {
447 let path = test_pcap_path("dns.cap");
448 if !path.exists() {
449 return;
450 }
451 let source = MmapPacketSource::open(&path).unwrap();
452 assert_eq!(source.metadata().link_type, 1);
454 }
455
456 #[test]
457 fn test_mmap_netlink_link_type() {
458 let path = test_pcap_path("nlmon-big.pcap");
459 if !path.exists() {
460 eprintln!("Skipping test - nlmon-big.pcap not found");
461 return;
462 }
463
464 let source = MmapPacketSource::open(&path).unwrap();
465 assert_eq!(
467 source.link_type(),
468 253,
469 "Expected LINKTYPE_NETLINK (253), got {}",
470 source.link_type()
471 );
472 }
473
474 #[test]
475 fn test_mmap_debug_format() {
476 let path = test_pcap_path("dns.cap");
477 if !path.exists() {
478 return;
479 }
480
481 let source = MmapPacketSource::open(&path).unwrap();
482 let debug_str = format!("{:?}", source);
483 assert!(debug_str.contains("MmapPacketSource"));
484 assert!(debug_str.contains("dns.cap"));
485 }
486
487 #[test]
488 fn test_mmap_packet_data_integrity() {
489 let path = test_pcap_path("dns.cap");
490 if !path.exists() {
491 return;
492 }
493
494 let source = MmapPacketSource::open(&path).unwrap();
495 let mut reader = source.reader(None).unwrap();
496
497 reader
499 .process_packets(1, |packet| {
500 assert!(packet.data.len() >= 14, "Packet too small for Ethernet");
502
503 assert_eq!(packet.captured_len as usize, packet.data.len());
505
506 assert!(packet.original_len >= packet.captured_len);
508
509 assert_eq!(packet.frame_number, 1);
511
512 assert!(packet.timestamp_us > 0);
514 Ok(())
515 })
516 .unwrap();
517 }
518
519 #[test]
520 fn test_mmap_all_packets_valid() {
521 let path = test_pcap_path("dns.cap");
522 if !path.exists() {
523 return;
524 }
525
526 let source = MmapPacketSource::open(&path).unwrap();
527 let mut reader = source.reader(None).unwrap();
528
529 let mut prev_frame = 0u64;
530 let mut count = 0;
531
532 loop {
533 let processed = reader
534 .process_packets(100, |packet| {
535 assert_eq!(packet.frame_number, prev_frame + 1);
537 prev_frame = packet.frame_number;
538
539 assert!(packet.data.len() <= 65535, "Packet exceeds max size");
541 assert_eq!(packet.captured_len as usize, packet.data.len());
542
543 count += 1;
544 Ok(())
545 })
546 .unwrap();
547 if processed == 0 {
548 break;
549 }
550 }
551
552 assert!(count > 0, "Should have read packets");
553 }
554
555 #[test]
556 fn test_mmap_timestamp_reasonableness() {
557 let path = test_pcap_path("dns.cap");
558 if !path.exists() {
559 return;
560 }
561
562 let source = MmapPacketSource::open(&path).unwrap();
563 let mut reader = source.reader(None).unwrap();
564
565 let mut prev_timestamp = 0i64;
566
567 loop {
568 let processed = reader
569 .process_packets(100, |packet| {
570 assert!(packet.timestamp_us >= 0);
572
573 if prev_timestamp > 0 {
576 let diff = packet.timestamp_us - prev_timestamp;
577 assert!(
578 diff >= -1_000_000, "Timestamp went backwards by {} us",
580 diff.abs()
581 );
582 }
583 prev_timestamp = packet.timestamp_us;
584 Ok(())
585 })
586 .unwrap();
587 if processed == 0 {
588 break;
589 }
590 }
591 }
592
593 #[test]
594 fn test_mmap_range_reading() {
595 let path = test_pcap_path("dns.cap");
596 if !path.exists() {
597 return;
598 }
599
600 let source = MmapPacketSource::open(&path).unwrap();
601
602 let range = PacketRange {
604 start: PacketPosition {
605 byte_offset: 0,
606 frame_number: 0,
607 },
608 end: Some(PacketPosition {
609 byte_offset: 0,
610 frame_number: 10,
611 }),
612 };
613
614 let mut reader = source.reader(Some(&range)).unwrap();
615
616 let mut count = 0;
617 loop {
618 let processed = reader
619 .process_packets(100, |_| {
620 count += 1;
621 Ok(())
622 })
623 .unwrap();
624 if processed == 0 || count > 100 {
625 break; }
627 }
628
629 assert!(count <= 10, "Should respect range end limit, got {}", count);
631 }
632
633 #[test]
634 fn test_mmap_metadata() {
635 let path = test_pcap_path("dns.cap");
636 if !path.exists() {
637 return;
638 }
639
640 let source = MmapPacketSource::open(&path).unwrap();
641 let meta = source.metadata();
642
643 assert_eq!(meta.link_type, 1); assert!(meta.size_bytes.is_some());
645 assert!(meta.size_bytes.unwrap() > 0);
646 }
647
648 #[test]
649 fn test_mmap_clone() {
650 let path = test_pcap_path("dns.cap");
651 if !path.exists() {
652 return;
653 }
654
655 let source1 = MmapPacketSource::open(&path).unwrap();
656 let source2 = source1.clone();
657
658 let mut reader1 = source1.reader(None).unwrap();
660 let mut reader2 = source2.reader(None).unwrap();
661
662 let mut frame1 = 0u64;
663 let mut len1 = 0usize;
664 let mut frame2 = 0u64;
665 let mut len2 = 0usize;
666
667 reader1
668 .process_packets(1, |p| {
669 frame1 = p.frame_number;
670 len1 = p.data.len();
671 Ok(())
672 })
673 .unwrap();
674 reader2
675 .process_packets(1, |p| {
676 frame2 = p.frame_number;
677 len2 = p.data.len();
678 Ok(())
679 })
680 .unwrap();
681
682 assert_eq!(frame1, frame2);
684 assert_eq!(len1, len2);
685 }
686
687 #[test]
688 fn test_mmap_partitions() {
689 let path = test_pcap_path("dns.cap");
690 if !path.exists() {
691 return;
692 }
693
694 let source = MmapPacketSource::open(&path).unwrap();
695 let partitions = source.partitions(4).unwrap();
696
697 assert_eq!(partitions.len(), 1);
699 }
700}