pcapsql_core/io/
source.rs

1//! Packet source abstractions and implementations.
2//!
3//! This module defines traits for abstracting over packet sources (files, memory-mapped
4//! files, cloud storage, etc.) and provides implementations for common sources.
5//!
6//! ## Design Principles
7//!
8//! - Generic traits with associated types (no `Box<dyn>` in hot path)
9//! - Matches existing enum-dispatch pattern used for protocols
10//! - Supports future backends (mmap, S3) without trait changes
11//! - Type erasure happens at DataFusion boundaries, not in hot path
12
13use std::path::{Path, PathBuf};
14
15use bytes::Bytes;
16
17use crate::error::Error;
18use crate::pcap::PcapReader;
19
20/// Borrowed packet reference - zero-copy view into pcap_parser buffer.
21///
22/// This struct is passed to callbacks in `process_packets()`. The data
23/// is only valid for the duration of the callback - it must be processed
24/// (parsed, copied to Arrow buffers, etc.) before the callback returns.
25#[derive(Debug, Clone, Copy)]
26pub struct PacketRef<'a> {
27    /// Frame number (1-indexed, matching Wireshark)
28    pub frame_number: u64,
29    /// Timestamp in microseconds since Unix epoch
30    pub timestamp_us: i64,
31    /// Captured length (may be less than original)
32    pub captured_len: u32,
33    /// Original packet length on the wire
34    pub original_len: u32,
35    /// Link layer type (e.g., 1 = Ethernet)
36    pub link_type: u16,
37    /// Packet data (borrowed from pcap_parser buffer)
38    pub data: &'a [u8],
39}
40
41impl<'a> PacketRef<'a> {
42    /// Check if the packet was truncated during capture.
43    #[inline]
44    pub fn is_truncated(&self) -> bool {
45        self.captured_len < self.original_len
46    }
47}
48
49/// Position within a packet source (for seeking/checkpointing).
50#[derive(Clone, Debug, Default, PartialEq, Eq)]
51pub struct PacketPosition {
52    /// Byte offset in the underlying source
53    pub byte_offset: u64,
54    /// Frame number at this position (1-indexed, matching Wireshark)
55    pub frame_number: u64,
56}
57
58impl PacketPosition {
59    /// Position at the start of the source
60    pub const START: Self = Self {
61        byte_offset: 0,
62        frame_number: 1,
63    };
64}
65
66/// Range of packets for partitioning.
67#[derive(Clone, Debug)]
68pub struct PacketRange {
69    /// Start position (inclusive)
70    pub start: PacketPosition,
71    /// End position (exclusive). None means read to EOF.
72    pub end: Option<PacketPosition>,
73}
74
75impl PacketRange {
76    /// Range covering the entire source
77    pub fn whole() -> Self {
78        Self {
79            start: PacketPosition::START,
80            end: None,
81        }
82    }
83
84    /// Check if a frame number is within this range
85    pub fn contains(&self, frame_number: u64) -> bool {
86        frame_number >= self.start.frame_number
87            && self
88                .end
89                .as_ref()
90                .is_none_or(|e| frame_number < e.frame_number)
91    }
92}
93
94/// Metadata about a packet source.
95#[derive(Clone, Debug)]
96pub struct PacketSourceMetadata {
97    /// Link-layer type (e.g., 1 = Ethernet)
98    pub link_type: u32,
99    /// Snapshot length
100    pub snaplen: u32,
101    /// Total size in bytes (if known)
102    pub size_bytes: Option<u64>,
103    /// Total packet count (if known, e.g., from index)
104    pub packet_count: Option<u64>,
105    /// Whether the source supports seeking
106    pub seekable: bool,
107}
108
109/// Raw packet data from a reader.
110#[derive(Clone, Debug)]
111pub struct RawPacket {
112    /// Frame number (1-indexed)
113    pub frame_number: u64,
114    /// Timestamp in microseconds since Unix epoch
115    pub timestamp_us: i64,
116    /// Captured length (may be less than original)
117    pub captured_length: u32,
118    /// Original packet length on the wire
119    pub original_length: u32,
120    /// Link layer type (e.g., 1 = Ethernet)
121    pub link_type: u16,
122    /// Packet data (potentially zero-copy with Bytes)
123    pub data: Bytes,
124}
125
126impl RawPacket {
127    /// Create a new raw packet from owned data.
128    pub fn new(
129        frame_number: u64,
130        timestamp_us: i64,
131        captured_length: u32,
132        original_length: u32,
133        link_type: u16,
134        data: Vec<u8>,
135    ) -> Self {
136        Self {
137            frame_number,
138            timestamp_us,
139            captured_length,
140            original_length,
141            link_type,
142            data: Bytes::from(data),
143        }
144    }
145
146    /// Create a new raw packet from Bytes (zero-copy if already Bytes).
147    pub fn from_bytes(
148        frame_number: u64,
149        timestamp_us: i64,
150        captured_length: u32,
151        original_length: u32,
152        link_type: u16,
153        data: Bytes,
154    ) -> Self {
155        Self {
156            frame_number,
157            timestamp_us,
158            captured_length,
159            original_length,
160            link_type,
161            data,
162        }
163    }
164
165    /// Check if the packet was truncated during capture.
166    pub fn is_truncated(&self) -> bool {
167        self.captured_length < self.original_length
168    }
169}
170
171/// Source of packet data. Creates readers and computes partitions.
172///
173/// This trait uses an associated type for `Reader` to enable static dispatch
174/// in the hot path, matching the enum-dispatch pattern used for protocols.
175///
176/// # Design Notes
177///
178/// We use generics rather than `Box<dyn PacketReader>` because:
179/// 1. Each QueryEngine uses ONE source type (no heterogeneous mixing)
180/// 2. The hot loop uses `reader.process_packets()` for zero-copy access
181/// 3. Static dispatch enables inlining and optimization
182/// 4. Type erasure happens at DataFusion boundaries anyway
183pub trait PacketSource: Send + Sync + Clone + 'static {
184    /// The reader type this source produces
185    type Reader: PacketReader;
186
187    /// Get metadata about this source
188    fn metadata(&self) -> &PacketSourceMetadata;
189
190    /// Create a reader for the given range.
191    /// If range is None, reads the entire source.
192    fn reader(&self, range: Option<&PacketRange>) -> Result<Self::Reader, Error>;
193
194    /// Compute partition boundaries for parallel reading.
195    ///
196    /// Returns at most `max_partitions` non-overlapping ranges that cover
197    /// the entire source. The default implementation returns a single
198    /// partition (the whole source).
199    ///
200    /// # Phase 2.5
201    ///
202    /// This default implementation is sufficient for Phase 2.
203    /// Phase 2.5 will override this to scan the file and find
204    /// packet boundaries at approximately equal byte offsets.
205    fn partitions(&self, _max_partitions: usize) -> Result<Vec<PacketRange>, Error> {
206        Ok(vec![PacketRange::whole()])
207    }
208
209    /// Get the link type for this source.
210    fn link_type(&self) -> u32 {
211        self.metadata().link_type
212    }
213}
214
215/// Sequential reader of packets from a source.
216///
217/// This is the hot path - implementations should be optimized for
218/// sequential reading with minimal overhead per packet.
219///
220/// ## Zero-Copy API
221///
222/// Uses `process_packets()` with a callback pattern to avoid copying
223/// packet data. The callback receives borrowed packet data that is
224/// only valid during the callback invocation.
225pub trait PacketReader: Send + Unpin {
226    /// Process up to `max` packets with borrowed data via callback.
227    ///
228    /// The callback receives borrowed packet data and must process it
229    /// (parse, add to Arrow builders, etc.) before returning. The borrow
230    /// is only valid during the callback.
231    ///
232    /// Returns the number of packets processed (may be less than `max` at EOF).
233    ///
234    /// # Example
235    ///
236    /// ```ignore
237    /// reader.process_packets(1024, |packet| {
238    ///     // packet.data is borrowed - must be processed here
239    ///     let parsed = parse_packet(packet.link_type, packet.data);
240    ///     builder.add_row(packet.frame_number, &parsed);
241    ///     Ok(())
242    /// })?;
243    /// ```
244    fn process_packets<F>(&mut self, max: usize, f: F) -> Result<usize, Error>
245    where
246        F: FnMut(PacketRef<'_>) -> Result<(), Error>;
247
248    /// Current position in the source.
249    fn position(&self) -> PacketPosition;
250
251    /// Get the link type for packets from this reader.
252    fn link_type(&self) -> u32;
253}
254
255/// Packet source backed by a PCAP/PCAPNG file.
256#[derive(Clone)]
257pub struct FilePacketSource {
258    path: PathBuf,
259    metadata: PacketSourceMetadata,
260}
261
262impl FilePacketSource {
263    /// Open a PCAP file as a packet source.
264    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
265        let path = path.as_ref().to_path_buf();
266
267        // Open once to get link type and other metadata
268        let reader = PcapReader::open(&path)?;
269        let link_type = reader.link_type() as u32;
270
271        let size_bytes = std::fs::metadata(&path).ok().map(|m| m.len());
272
273        let metadata = PacketSourceMetadata {
274            link_type,
275            snaplen: 65535, // Default, could read from header
276            size_bytes,
277            packet_count: None, // Would require scanning the file
278            seekable: true,
279        };
280
281        Ok(Self { path, metadata })
282    }
283
284    /// Get the path to the file.
285    pub fn file_path(&self) -> &Path {
286        &self.path
287    }
288}
289
290impl PacketSource for FilePacketSource {
291    type Reader = FilePacketReader;
292
293    fn metadata(&self) -> &PacketSourceMetadata {
294        &self.metadata
295    }
296
297    fn reader(&self, range: Option<&PacketRange>) -> Result<Self::Reader, Error> {
298        FilePacketReader::open(&self.path, self.metadata.link_type, range)
299    }
300
301    // partitions() uses default implementation (single partition)
302    // Phase 2.5 will override this
303}
304
305/// Sequential packet reader for PCAP files.
306pub struct FilePacketReader {
307    inner: PcapReader,
308    link_type: u32,
309    position: PacketPosition,
310    range: Option<PacketRange>,
311}
312
313impl FilePacketReader {
314    /// Open a reader starting at the given position.
315    fn open(path: &Path, link_type: u32, range: Option<&PacketRange>) -> Result<Self, Error> {
316        let inner = PcapReader::open(path)?;
317
318        // If starting position specified with frame > 1, we'd need to scan
319        // For now, we always start from the beginning
320        let position = if let Some(r) = range {
321            if r.start.frame_number > 1 {
322                tracing::warn!(
323                    "FilePacketReader doesn't support seeking to frame {}, starting from beginning",
324                    r.start.frame_number
325                );
326            }
327            r.start.clone()
328        } else {
329            PacketPosition::START
330        };
331
332        Ok(Self {
333            inner,
334            link_type,
335            position,
336            range: range.cloned(),
337        })
338    }
339
340    /// Check if we've reached the end of our range
341    #[inline]
342    fn past_range_end(&self) -> bool {
343        if let Some(ref range) = self.range {
344            if let Some(ref end) = range.end {
345                return self.position.frame_number >= end.frame_number;
346            }
347        }
348        false
349    }
350
351    /// Skip packets until we reach the range start.
352    fn skip_to_range_start(&mut self) -> Result<(), Error> {
353        while self.inner.frame_count() + 1 < self.position.frame_number {
354            // Use process_packets to skip one packet at a time
355            let processed = self.inner.process_packets(1, |_| Ok(()))?;
356            if processed == 0 {
357                break; // EOF
358            }
359        }
360        Ok(())
361    }
362}
363
364impl PacketReader for FilePacketReader {
365    fn process_packets<F>(&mut self, max: usize, mut f: F) -> Result<usize, Error>
366    where
367        F: FnMut(PacketRef<'_>) -> Result<(), Error>,
368    {
369        // Check range bounds
370        if self.past_range_end() {
371            return Ok(0);
372        }
373
374        // Skip packets before our range start (using internal skip method)
375        self.skip_to_range_start()?;
376
377        // Calculate how many packets we can process before hitting range end
378        let effective_max = if let Some(ref range) = self.range {
379            if let Some(ref end) = range.end {
380                let remaining = end.frame_number.saturating_sub(self.inner.frame_count());
381                max.min(remaining as usize)
382            } else {
383                max
384            }
385        } else {
386            max
387        };
388
389        // Delegate to underlying PcapReader's zero-copy implementation
390        let count = self.inner.process_packets(effective_max, |packet| {
391            // Update position tracking
392            self.position.frame_number = packet.frame_number + 1;
393            f(packet)
394        })?;
395
396        Ok(count)
397    }
398
399    #[inline]
400    fn position(&self) -> PacketPosition {
401        self.position.clone()
402    }
403
404    fn link_type(&self) -> u32 {
405        self.link_type
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412
413    #[test]
414    fn test_packet_range_whole() {
415        let range = PacketRange::whole();
416        assert_eq!(range.start, PacketPosition::START);
417        assert!(range.end.is_none());
418    }
419
420    #[test]
421    fn test_packet_position_start() {
422        let pos = PacketPosition::START;
423        assert_eq!(pos.frame_number, 1);
424        assert_eq!(pos.byte_offset, 0);
425    }
426
427    #[test]
428    fn test_packet_range_contains() {
429        let range = PacketRange {
430            start: PacketPosition {
431                byte_offset: 0,
432                frame_number: 5,
433            },
434            end: Some(PacketPosition {
435                byte_offset: 0,
436                frame_number: 10,
437            }),
438        };
439
440        assert!(!range.contains(4));
441        assert!(range.contains(5));
442        assert!(range.contains(9));
443        assert!(!range.contains(10));
444    }
445
446    #[test]
447    fn test_packet_range_contains_no_end() {
448        let range = PacketRange {
449            start: PacketPosition {
450                byte_offset: 0,
451                frame_number: 5,
452            },
453            end: None,
454        };
455
456        assert!(!range.contains(4));
457        assert!(range.contains(5));
458        assert!(range.contains(1000));
459    }
460}