pcapsql_core/io/source.rs
1//! Packet source abstractions and implementations.
2//!
3//! This module defines traits for abstracting over packet sources (files, memory-mapped
4//! files, cloud storage, etc.) and provides implementations for common sources.
5//!
6//! ## Design Principles
7//!
8//! - Generic traits with associated types (no `Box<dyn>` in hot path)
9//! - Matches existing enum-dispatch pattern used for protocols
10//! - Supports future backends (mmap, S3) without trait changes
11//! - Type erasure happens at DataFusion boundaries, not in hot path
12
13use std::path::{Path, PathBuf};
14
15use bytes::Bytes;
16
17use crate::error::Error;
18use crate::pcap::PcapReader;
19
20/// Borrowed packet reference - zero-copy view into pcap_parser buffer.
21///
22/// This struct is passed to callbacks in `process_packets()`. The data
23/// is only valid for the duration of the callback - it must be processed
24/// (parsed, copied to Arrow buffers, etc.) before the callback returns.
25#[derive(Debug, Clone, Copy)]
26pub struct PacketRef<'a> {
27 /// Frame number (1-indexed, matching Wireshark)
28 pub frame_number: u64,
29 /// Timestamp in microseconds since Unix epoch
30 pub timestamp_us: i64,
31 /// Captured length (may be less than original)
32 pub captured_len: u32,
33 /// Original packet length on the wire
34 pub original_len: u32,
35 /// Link layer type (e.g., 1 = Ethernet)
36 pub link_type: u16,
37 /// Packet data (borrowed from pcap_parser buffer)
38 pub data: &'a [u8],
39}
40
41impl<'a> PacketRef<'a> {
42 /// Check if the packet was truncated during capture.
43 #[inline]
44 pub fn is_truncated(&self) -> bool {
45 self.captured_len < self.original_len
46 }
47}
48
49/// Position within a packet source (for seeking/checkpointing).
50#[derive(Clone, Debug, Default, PartialEq, Eq)]
51pub struct PacketPosition {
52 /// Byte offset in the underlying source
53 pub byte_offset: u64,
54 /// Frame number at this position (1-indexed, matching Wireshark)
55 pub frame_number: u64,
56}
57
58impl PacketPosition {
59 /// Position at the start of the source
60 pub const START: Self = Self {
61 byte_offset: 0,
62 frame_number: 1,
63 };
64}
65
66/// Range of packets for partitioning.
67#[derive(Clone, Debug)]
68pub struct PacketRange {
69 /// Start position (inclusive)
70 pub start: PacketPosition,
71 /// End position (exclusive). None means read to EOF.
72 pub end: Option<PacketPosition>,
73}
74
75impl PacketRange {
76 /// Range covering the entire source
77 pub fn whole() -> Self {
78 Self {
79 start: PacketPosition::START,
80 end: None,
81 }
82 }
83
84 /// Check if a frame number is within this range
85 pub fn contains(&self, frame_number: u64) -> bool {
86 frame_number >= self.start.frame_number
87 && self
88 .end
89 .as_ref()
90 .is_none_or(|e| frame_number < e.frame_number)
91 }
92}
93
94/// Metadata about a packet source.
95#[derive(Clone, Debug)]
96pub struct PacketSourceMetadata {
97 /// Link-layer type (e.g., 1 = Ethernet)
98 pub link_type: u32,
99 /// Snapshot length
100 pub snaplen: u32,
101 /// Total size in bytes (if known)
102 pub size_bytes: Option<u64>,
103 /// Total packet count (if known, e.g., from index)
104 pub packet_count: Option<u64>,
105 /// Whether the source supports seeking
106 pub seekable: bool,
107}
108
109/// Raw packet data from a reader.
110#[derive(Clone, Debug)]
111pub struct RawPacket {
112 /// Frame number (1-indexed)
113 pub frame_number: u64,
114 /// Timestamp in microseconds since Unix epoch
115 pub timestamp_us: i64,
116 /// Captured length (may be less than original)
117 pub captured_length: u32,
118 /// Original packet length on the wire
119 pub original_length: u32,
120 /// Link layer type (e.g., 1 = Ethernet)
121 pub link_type: u16,
122 /// Packet data (potentially zero-copy with Bytes)
123 pub data: Bytes,
124}
125
126impl RawPacket {
127 /// Create a new raw packet from owned data.
128 pub fn new(
129 frame_number: u64,
130 timestamp_us: i64,
131 captured_length: u32,
132 original_length: u32,
133 link_type: u16,
134 data: Vec<u8>,
135 ) -> Self {
136 Self {
137 frame_number,
138 timestamp_us,
139 captured_length,
140 original_length,
141 link_type,
142 data: Bytes::from(data),
143 }
144 }
145
146 /// Create a new raw packet from Bytes (zero-copy if already Bytes).
147 pub fn from_bytes(
148 frame_number: u64,
149 timestamp_us: i64,
150 captured_length: u32,
151 original_length: u32,
152 link_type: u16,
153 data: Bytes,
154 ) -> Self {
155 Self {
156 frame_number,
157 timestamp_us,
158 captured_length,
159 original_length,
160 link_type,
161 data,
162 }
163 }
164
165 /// Check if the packet was truncated during capture.
166 pub fn is_truncated(&self) -> bool {
167 self.captured_length < self.original_length
168 }
169}
170
171/// Source of packet data. Creates readers and computes partitions.
172///
173/// This trait uses an associated type for `Reader` to enable static dispatch
174/// in the hot path, matching the enum-dispatch pattern used for protocols.
175///
176/// # Design Notes
177///
178/// We use generics rather than `Box<dyn PacketReader>` because:
179/// 1. Each QueryEngine uses ONE source type (no heterogeneous mixing)
180/// 2. The hot loop uses `reader.process_packets()` for zero-copy access
181/// 3. Static dispatch enables inlining and optimization
182/// 4. Type erasure happens at DataFusion boundaries anyway
183pub trait PacketSource: Send + Sync + Clone + 'static {
184 /// The reader type this source produces
185 type Reader: PacketReader;
186
187 /// Get metadata about this source
188 fn metadata(&self) -> &PacketSourceMetadata;
189
190 /// Create a reader for the given range.
191 /// If range is None, reads the entire source.
192 fn reader(&self, range: Option<&PacketRange>) -> Result<Self::Reader, Error>;
193
194 /// Compute partition boundaries for parallel reading.
195 ///
196 /// Returns at most `max_partitions` non-overlapping ranges that cover
197 /// the entire source. The default implementation returns a single
198 /// partition (the whole source).
199 ///
200 /// # Phase 2.5
201 ///
202 /// This default implementation is sufficient for Phase 2.
203 /// Phase 2.5 will override this to scan the file and find
204 /// packet boundaries at approximately equal byte offsets.
205 fn partitions(&self, _max_partitions: usize) -> Result<Vec<PacketRange>, Error> {
206 Ok(vec![PacketRange::whole()])
207 }
208
209 /// Get the link type for this source.
210 fn link_type(&self) -> u32 {
211 self.metadata().link_type
212 }
213}
214
215/// Sequential reader of packets from a source.
216///
217/// This is the hot path - implementations should be optimized for
218/// sequential reading with minimal overhead per packet.
219///
220/// ## Zero-Copy API
221///
222/// Uses `process_packets()` with a callback pattern to avoid copying
223/// packet data. The callback receives borrowed packet data that is
224/// only valid during the callback invocation.
225pub trait PacketReader: Send + Unpin {
226 /// Process up to `max` packets with borrowed data via callback.
227 ///
228 /// The callback receives borrowed packet data and must process it
229 /// (parse, add to Arrow builders, etc.) before returning. The borrow
230 /// is only valid during the callback.
231 ///
232 /// Returns the number of packets processed (may be less than `max` at EOF).
233 ///
234 /// # Example
235 ///
236 /// ```ignore
237 /// reader.process_packets(1024, |packet| {
238 /// // packet.data is borrowed - must be processed here
239 /// let parsed = parse_packet(packet.link_type, packet.data);
240 /// builder.add_row(packet.frame_number, &parsed);
241 /// Ok(())
242 /// })?;
243 /// ```
244 fn process_packets<F>(&mut self, max: usize, f: F) -> Result<usize, Error>
245 where
246 F: FnMut(PacketRef<'_>) -> Result<(), Error>;
247
248 /// Current position in the source.
249 fn position(&self) -> PacketPosition;
250
251 /// Get the link type for packets from this reader.
252 fn link_type(&self) -> u32;
253}
254
255/// Packet source backed by a PCAP/PCAPNG file.
256#[derive(Clone)]
257pub struct FilePacketSource {
258 path: PathBuf,
259 metadata: PacketSourceMetadata,
260}
261
262impl FilePacketSource {
263 /// Open a PCAP file as a packet source.
264 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
265 let path = path.as_ref().to_path_buf();
266
267 // Open once to get link type and other metadata
268 let reader = PcapReader::open(&path)?;
269 let link_type = reader.link_type() as u32;
270
271 let size_bytes = std::fs::metadata(&path).ok().map(|m| m.len());
272
273 let metadata = PacketSourceMetadata {
274 link_type,
275 snaplen: 65535, // Default, could read from header
276 size_bytes,
277 packet_count: None, // Would require scanning the file
278 seekable: true,
279 };
280
281 Ok(Self { path, metadata })
282 }
283
284 /// Get the path to the file.
285 pub fn file_path(&self) -> &Path {
286 &self.path
287 }
288}
289
290impl PacketSource for FilePacketSource {
291 type Reader = FilePacketReader;
292
293 fn metadata(&self) -> &PacketSourceMetadata {
294 &self.metadata
295 }
296
297 fn reader(&self, range: Option<&PacketRange>) -> Result<Self::Reader, Error> {
298 FilePacketReader::open(&self.path, self.metadata.link_type, range)
299 }
300
301 // partitions() uses default implementation (single partition)
302 // Phase 2.5 will override this
303}
304
305/// Sequential packet reader for PCAP files.
306pub struct FilePacketReader {
307 inner: PcapReader,
308 link_type: u32,
309 position: PacketPosition,
310 range: Option<PacketRange>,
311}
312
313impl FilePacketReader {
314 /// Open a reader starting at the given position.
315 fn open(path: &Path, link_type: u32, range: Option<&PacketRange>) -> Result<Self, Error> {
316 let inner = PcapReader::open(path)?;
317
318 // If starting position specified with frame > 1, we'd need to scan
319 // For now, we always start from the beginning
320 let position = if let Some(r) = range {
321 if r.start.frame_number > 1 {
322 tracing::warn!(
323 "FilePacketReader doesn't support seeking to frame {}, starting from beginning",
324 r.start.frame_number
325 );
326 }
327 r.start.clone()
328 } else {
329 PacketPosition::START
330 };
331
332 Ok(Self {
333 inner,
334 link_type,
335 position,
336 range: range.cloned(),
337 })
338 }
339
340 /// Check if we've reached the end of our range
341 #[inline]
342 fn past_range_end(&self) -> bool {
343 if let Some(ref range) = self.range {
344 if let Some(ref end) = range.end {
345 return self.position.frame_number >= end.frame_number;
346 }
347 }
348 false
349 }
350
351 /// Skip packets until we reach the range start.
352 fn skip_to_range_start(&mut self) -> Result<(), Error> {
353 while self.inner.frame_count() + 1 < self.position.frame_number {
354 // Use process_packets to skip one packet at a time
355 let processed = self.inner.process_packets(1, |_| Ok(()))?;
356 if processed == 0 {
357 break; // EOF
358 }
359 }
360 Ok(())
361 }
362}
363
364impl PacketReader for FilePacketReader {
365 fn process_packets<F>(&mut self, max: usize, mut f: F) -> Result<usize, Error>
366 where
367 F: FnMut(PacketRef<'_>) -> Result<(), Error>,
368 {
369 // Check range bounds
370 if self.past_range_end() {
371 return Ok(0);
372 }
373
374 // Skip packets before our range start (using internal skip method)
375 self.skip_to_range_start()?;
376
377 // Calculate how many packets we can process before hitting range end
378 let effective_max = if let Some(ref range) = self.range {
379 if let Some(ref end) = range.end {
380 let remaining = end.frame_number.saturating_sub(self.inner.frame_count());
381 max.min(remaining as usize)
382 } else {
383 max
384 }
385 } else {
386 max
387 };
388
389 // Delegate to underlying PcapReader's zero-copy implementation
390 let count = self.inner.process_packets(effective_max, |packet| {
391 // Update position tracking
392 self.position.frame_number = packet.frame_number + 1;
393 f(packet)
394 })?;
395
396 Ok(count)
397 }
398
399 #[inline]
400 fn position(&self) -> PacketPosition {
401 self.position.clone()
402 }
403
404 fn link_type(&self) -> u32 {
405 self.link_type
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
414 fn test_packet_range_whole() {
415 let range = PacketRange::whole();
416 assert_eq!(range.start, PacketPosition::START);
417 assert!(range.end.is_none());
418 }
419
420 #[test]
421 fn test_packet_position_start() {
422 let pos = PacketPosition::START;
423 assert_eq!(pos.frame_number, 1);
424 assert_eq!(pos.byte_offset, 0);
425 }
426
427 #[test]
428 fn test_packet_range_contains() {
429 let range = PacketRange {
430 start: PacketPosition {
431 byte_offset: 0,
432 frame_number: 5,
433 },
434 end: Some(PacketPosition {
435 byte_offset: 0,
436 frame_number: 10,
437 }),
438 };
439
440 assert!(!range.contains(4));
441 assert!(range.contains(5));
442 assert!(range.contains(9));
443 assert!(!range.contains(10));
444 }
445
446 #[test]
447 fn test_packet_range_contains_no_end() {
448 let range = PacketRange {
449 start: PacketPosition {
450 byte_offset: 0,
451 frame_number: 5,
452 },
453 end: None,
454 };
455
456 assert!(!range.contains(4));
457 assert!(range.contains(5));
458 assert!(range.contains(1000));
459 }
460}