Skip to main content

pcap_toolkit/replay/
mod.rs

1//! Live packet replay onto a real network interface.
2//!
3//! Reads packets from a PCAP file and injects them via a raw `AF_PACKET`
4//! socket (Linux only) using the original or a scaled inter-packet timing.
5//!
6//! ## Timing modes
7//!
8//! | CLI flag               | Behaviour                                        |
9//! |------------------------|--------------------------------------------------|
10//! | *(none)*               | Real-time: honour original inter-packet gaps     |
11//! | `--speed 2.0`          | Multiplier: replay 2× faster than original       |
12//! | `--speed max`          | No delay — transmit as fast as the NIC allows    |
13//! | `--pps 4096`           | Fixed rate: 4 096 packets/second, ignores gaps   |
14//!
15//! ## Permissions
16//!
17//! Raw sockets require `CAP_NET_RAW`. If the capability is absent the function
18//! returns [`ReplayError::PermissionDenied`] with a remediation hint.
19
20use std::path::Path;
21use std::time::{Duration, Instant};
22
23use crate::bpf::BpfExpr;
24use crate::error::ReplayError;
25use crate::filter::{Filter, PacketMeta};
26use crate::pcap;
27
28// ── Public types ─────────────────────────────────────────────────────────────
29
30/// How to pace packet transmission during replay.
31#[derive(Debug, Clone)]
32pub enum ReplaySpeed {
33    /// Honour original inter-packet timing (the default).
34    RealTime,
35    /// Scale timing by a multiplier: `>1.0` = faster, `<1.0` = slower.
36    Multiplier(f64),
37    /// Send as fast as possible with no delay between packets.
38    Max,
39    /// Fixed transmission rate in packets per second (ignores original timing).
40    Pps(u64),
41}
42
43impl ReplaySpeed {
44    /// Parse a speed string.
45    ///
46    /// | Input  | Result                    |
47    /// |--------|---------------------------|
48    /// | `"max"` | [`ReplaySpeed::Max`]     |
49    /// | `"1.0"` | [`ReplaySpeed::RealTime`] |
50    /// | `"2.5"` | [`ReplaySpeed::Multiplier(2.5)`] |
51    ///
52    /// Returns `None` for non-positive numbers or unrecognised strings.
53    pub fn parse(s: &str) -> Option<Self> {
54        if s.eq_ignore_ascii_case("max") {
55            return Some(ReplaySpeed::Max);
56        }
57        s.parse::<f64>().ok().filter(|&f| f > 0.0).map(|f| {
58            if (f - 1.0).abs() < f64::EPSILON {
59                ReplaySpeed::RealTime
60            } else {
61                ReplaySpeed::Multiplier(f)
62            }
63        })
64    }
65}
66
67/// Options for [`replay_file`].
68pub struct ReplayOptions {
69    /// Network interface names to transmit on (e.g. `["eth0", "eth1"]`).
70    /// Each packet is sent to all interfaces (fan-out).
71    pub interfaces: Vec<String>,
72    /// Packet pacing mode.
73    pub speed: ReplaySpeed,
74    /// Structured filter applied before sending each packet.
75    pub filter: Filter,
76    /// BPF expression filter AND-ed with `filter`. `None` = no BPF filter.
77    pub bpf_filter: Option<BpfExpr>,
78}
79
80/// Summary returned by [`replay_file`] on success.
81#[derive(Debug)]
82pub struct ReplayReport {
83    /// Total packets transmitted.
84    pub packets_sent: u64,
85    /// Total bytes transmitted (sum of captured lengths).
86    pub bytes_sent: u64,
87}
88
89// ── Public entry point ───────────────────────────────────────────────────────
90
91/// Replay all packets from `input` onto the network interface in `opts`.
92///
93/// Packets are read in file order (sort first with `sort_file` if needed).
94/// Filters are applied before each packet is sent.
95///
96/// # Errors
97/// Returns [`ReplayError`] on I/O failure, missing `CAP_NET_RAW`,
98/// unknown interface name, or if the platform is not Linux.
99pub fn replay_file(input: &Path, opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
100    platform::replay_impl(input, opts)
101}
102
103// ── Timing helper (platform-independent) ─────────────────────────────────────
104
105/// Compute how long to wait before sending the next packet.
106///
107/// - `speed`        — pacing mode
108/// - `pkt_ts_ns`    — original capture timestamp of this packet (nanoseconds)
109/// - `first_ts_ns`  — mutable slot that stores the first packet's timestamp
110/// - `sent_count`   — number of packets already sent (used for PPS mode)
111/// - `start_time`   — wall-clock instant when the first packet was sent
112pub(crate) fn compute_delay(
113    speed: &ReplaySpeed,
114    pkt_ts_ns: u64,
115    first_ts_ns: &mut Option<u64>,
116    sent_count: u64,
117    start_time: &Instant,
118) -> Duration {
119    match speed {
120        ReplaySpeed::Max => Duration::ZERO,
121
122        ReplaySpeed::Pps(pps) => {
123            if *pps == 0 {
124                return Duration::ZERO;
125            }
126            // Packet N should go out at N * (1_000_000_000 / pps) ns after start.
127            // Use u128 to avoid overflow: sent_count * 1e9 wraps u64 after ~18.4 B
128            // packets (~5 h at 1 Mpps). The result fits in u64 for any realistic run.
129            let target_ns = (sent_count as u128 * 1_000_000_000 / *pps as u128) as u64;
130            let elapsed_ns = start_time.elapsed().as_nanos() as u64;
131            if target_ns > elapsed_ns {
132                Duration::from_nanos(target_ns - elapsed_ns)
133            } else {
134                Duration::ZERO
135            }
136        }
137
138        ReplaySpeed::RealTime | ReplaySpeed::Multiplier(_) => {
139            // Measure the gap from the first packet's timestamp, then scale.
140            let first = *first_ts_ns.get_or_insert(pkt_ts_ns);
141            let capture_gap_ns = pkt_ts_ns.saturating_sub(first);
142            let scaled_gap_ns = match speed {
143                ReplaySpeed::Multiplier(f) => (capture_gap_ns as f64 / f) as u64,
144                _ => capture_gap_ns,
145            };
146            let elapsed_ns = start_time.elapsed().as_nanos() as u64;
147            if scaled_gap_ns > elapsed_ns {
148                Duration::from_nanos(scaled_gap_ns - elapsed_ns)
149            } else {
150                Duration::ZERO
151            }
152        }
153    }
154}
155
156// ── Platform implementations ─────────────────────────────────────────────────
157
158#[cfg(target_os = "linux")]
159mod platform {
160    use socket2::{Domain, Protocol, Socket, Type};
161
162    use super::*;
163
164    /// `AF_PACKET` address-family constant (Linux, all architectures).
165    const AF_PACKET: i32 = 17;
166
167    /// `ETH_P_ALL` in network byte order — required by `AF_PACKET` sockets.
168    ///
169    /// `htons(0x0003)` = `0x0300` = 768 on little-endian (the overwhelming
170    /// majority of Linux systems). Computed portably with `to_be()`.
171    const ETH_P_ALL_NBO: i32 = (0x0003_u16.to_be()) as i32;
172
173    /// Mirror of `struct sockaddr_ll` from `<linux/if_packet.h>`.
174    ///
175    /// `AF_PACKET` bind requires this address structure; `socket2` has no
176    /// built-in constructor for it, so we construct it directly.
177    #[repr(C)]
178    struct SockAddrLl {
179        sll_family: u16,
180        sll_protocol: u16,
181        sll_ifindex: i32,
182        sll_hatype: u16,
183        sll_pkttype: u8,
184        sll_halen: u8,
185        sll_addr: [u8; 8],
186    }
187
188    /// Read the interface index from `/sys/class/net/<iface>/ifindex`.
189    ///
190    /// This avoids a `libc` dependency while remaining reliable on any
191    /// Linux system (the sysfs entry is always present for real interfaces).
192    fn read_ifindex(iface: &str) -> Result<i32, ReplayError> {
193        let path = format!("/sys/class/net/{iface}/ifindex");
194        let s = std::fs::read_to_string(&path)
195            .map_err(|_| ReplayError::UnknownInterface(iface.to_owned()))?;
196        s.trim()
197            .parse::<i32>()
198            .map_err(|_| ReplayError::UnknownInterface(iface.to_owned()))
199    }
200
201    /// Open an `AF_PACKET / SOCK_RAW` socket and bind it to `iface`.
202    fn open_raw_socket(iface: &str) -> Result<Socket, ReplayError> {
203        let sock = Socket::new(
204            Domain::from(AF_PACKET),
205            Type::RAW,
206            Some(Protocol::from(ETH_P_ALL_NBO)),
207        )
208        .map_err(|e| {
209            if e.kind() == std::io::ErrorKind::PermissionDenied {
210                ReplayError::PermissionDenied(
211                    "creating a raw AF_PACKET socket requires CAP_NET_RAW; \
212                     run as root or: sudo setcap cap_net_raw+eip <binary>"
213                        .to_owned(),
214                )
215            } else {
216                ReplayError::Io(e)
217            }
218        })?;
219
220        let ifindex = read_ifindex(iface)?;
221
222        // Build sockaddr_ll and bind so that all outgoing packets use this NIC.
223        // try_init returns (T, SockAddr) where T is the closure's Ok type.
224        let (_, addr) = unsafe {
225            socket2::SockAddr::try_init(|storage, len| {
226                let sa = &mut *storage.cast::<SockAddrLl>();
227                sa.sll_family = AF_PACKET as u16;
228                sa.sll_protocol = 0x0003_u16.to_be(); // ETH_P_ALL in NBO
229                sa.sll_ifindex = ifindex;
230                sa.sll_hatype = 0;
231                sa.sll_pkttype = 0;
232                sa.sll_halen = 0;
233                sa.sll_addr = [0u8; 8];
234                *len = std::mem::size_of::<SockAddrLl>() as _;
235                Ok(())
236            })
237        }
238        .map_err(ReplayError::Io)?;
239
240        sock.bind(&addr).map_err(|e| {
241            if e.kind() == std::io::ErrorKind::PermissionDenied {
242                ReplayError::PermissionDenied("binding raw socket requires CAP_NET_RAW".to_owned())
243            } else {
244                ReplayError::Io(e)
245            }
246        })?;
247
248        Ok(sock)
249    }
250
251    pub fn replay_impl(input: &Path, opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
252        let sockets: Vec<Socket> = opts
253            .interfaces
254            .iter()
255            .map(|iface| open_raw_socket(iface))
256            .collect::<Result<_, _>>()?;
257
258        let has_filter = !opts.filter.is_empty() || opts.bpf_filter.is_some();
259
260        let iter =
261            pcap::open_with_payload(input).map_err(|e| ReplayError::PcapParse(e.to_string()))?;
262
263        let mut packets_sent: u64 = 0;
264        let mut bytes_sent: u64 = 0;
265        let mut first_ts_ns: Option<u64> = None;
266        let start_time = Instant::now();
267
268        for result in iter {
269            let pkt = result.map_err(|e| ReplayError::PcapParse(e.to_string()))?;
270
271            if has_filter {
272                let meta = PacketMeta::from_packet(
273                    pkt.info.timestamp_ns,
274                    pkt.info.captured_len,
275                    &pkt.data,
276                );
277                let struct_pass = opts.filter.is_empty() || opts.filter.matches(&meta);
278                let bpf_pass = opts
279                    .bpf_filter
280                    .as_ref()
281                    .map(|b| b.eval(&meta))
282                    .unwrap_or(true);
283                if !struct_pass || !bpf_pass {
284                    continue;
285                }
286            }
287
288            let delay = compute_delay(
289                &opts.speed,
290                pkt.info.timestamp_ns,
291                &mut first_ts_ns,
292                packets_sent,
293                &start_time,
294            );
295            if !delay.is_zero() {
296                std::thread::sleep(delay);
297            }
298
299            for sock in &sockets {
300                sock.send(&pkt.data).map_err(ReplayError::Io)?;
301            }
302            packets_sent += 1;
303            bytes_sent += pkt.data.len() as u64;
304        }
305
306        Ok(ReplayReport {
307            packets_sent,
308            bytes_sent,
309        })
310    }
311}
312
313#[cfg(not(target_os = "linux"))]
314mod platform {
315    use super::*;
316
317    pub fn replay_impl(_input: &Path, _opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
318        Err(ReplayError::NotSupported)
319    }
320}
321
322// ── Unit tests ────────────────────────────────────────────────────────────────
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_replay_speed_parse_max() {
330        assert!(matches!(ReplaySpeed::parse("max"), Some(ReplaySpeed::Max)));
331        assert!(matches!(ReplaySpeed::parse("MAX"), Some(ReplaySpeed::Max)));
332        assert!(matches!(ReplaySpeed::parse("Max"), Some(ReplaySpeed::Max)));
333    }
334
335    #[test]
336    fn test_replay_speed_parse_real_time() {
337        assert!(matches!(
338            ReplaySpeed::parse("1.0"),
339            Some(ReplaySpeed::RealTime)
340        ));
341        assert!(matches!(
342            ReplaySpeed::parse("1"),
343            Some(ReplaySpeed::RealTime)
344        ));
345    }
346
347    #[test]
348    fn test_replay_speed_parse_multiplier() {
349        assert!(matches!(
350            ReplaySpeed::parse("2.0"),
351            Some(ReplaySpeed::Multiplier(f)) if (f - 2.0).abs() < 1e-9
352        ));
353        assert!(matches!(
354            ReplaySpeed::parse("0.5"),
355            Some(ReplaySpeed::Multiplier(f)) if (f - 0.5).abs() < 1e-9
356        ));
357        assert!(matches!(
358            ReplaySpeed::parse("10"),
359            Some(ReplaySpeed::Multiplier(f)) if (f - 10.0).abs() < 1e-9
360        ));
361    }
362
363    #[test]
364    fn test_replay_speed_parse_invalid() {
365        assert!(ReplaySpeed::parse("").is_none());
366        assert!(ReplaySpeed::parse("abc").is_none());
367        assert!(ReplaySpeed::parse("-1.0").is_none());
368        assert!(ReplaySpeed::parse("0").is_none());
369        assert!(ReplaySpeed::parse("0.0").is_none());
370    }
371
372    #[test]
373    fn test_compute_delay_max_is_zero() {
374        let mut first_ts = None;
375        let start = Instant::now();
376        let d = compute_delay(&ReplaySpeed::Max, 1_000_000_000, &mut first_ts, 0, &start);
377        assert_eq!(d, Duration::ZERO);
378    }
379
380    #[test]
381    fn test_compute_delay_real_time_first_packet_is_zero() {
382        let mut first_ts = None;
383        let start = Instant::now();
384        let d = compute_delay(
385            &ReplaySpeed::RealTime,
386            1_000_000_000,
387            &mut first_ts,
388            0,
389            &start,
390        );
391        // First packet: gap = 0, so delay must be zero (or negligible).
392        assert!(d < Duration::from_millis(10));
393    }
394
395    #[test]
396    fn test_compute_delay_pps_first_packet_is_zero() {
397        let mut first_ts = None;
398        let start = Instant::now();
399        // Packet 0 at 1000 pps: target = 0 * 1e9 / 1000 = 0 ns.
400        let d = compute_delay(&ReplaySpeed::Pps(1000), 0, &mut first_ts, 0, &start);
401        assert_eq!(d, Duration::ZERO);
402    }
403
404    #[test]
405    fn test_compute_delay_pps_zero_is_safe() {
406        let mut first_ts = None;
407        let start = Instant::now();
408        let d = compute_delay(&ReplaySpeed::Pps(0), 0, &mut first_ts, 5, &start);
409        assert_eq!(d, Duration::ZERO);
410    }
411
412    #[test]
413    fn test_compute_delay_pps_spacing() {
414        // At 1 000 pps packet N should be sent 1 ms after packet 0.
415        // We ask for delay just before the start so elapsed ≈ 0, meaning
416        // the returned delay should equal the target offset.
417        let mut first_ts = None;
418        let start = Instant::now();
419        // Packet 1: target = 1 * 1e9 / 1000 = 1_000_000 ns = 1 ms.
420        let d = compute_delay(&ReplaySpeed::Pps(1_000), 0, &mut first_ts, 1, &start);
421        assert!(
422            d >= Duration::from_micros(990) && d <= Duration::from_millis(2),
423            "expected ~1 ms delay, got {d:?}"
424        );
425    }
426
427    #[test]
428    fn test_compute_delay_pps_no_overflow_at_high_count() {
429        // sent_count * 1_000_000_000 would overflow u64 at ~18.4 B.
430        // At 10 Gpps this threshold is crossed after 18 seconds of replay.
431        // Verify the result is still sane (positive, not wrapped).
432        let pps: u64 = 10_000_000_000; // 10 Gpps
433        let sent_count: u64 = 20_000_000_000; // 20 B packets — past the u64 overflow point
434        let mut first_ts = None;
435        let start = Instant::now();
436        let d = compute_delay(&ReplaySpeed::Pps(pps), 0, &mut first_ts, sent_count, &start);
437        // target_ns = 20e9 * 1e9 / 10e9 = 2_000_000_000 ns = 2 s
438        // elapsed is ~0, so delay should be ~2 s and must not be zero (wrap sentinel).
439        assert!(
440            d >= Duration::from_millis(1_990),
441            "delay wrapped or saturated: got {d:?}"
442        );
443    }
444
445    #[test]
446    fn test_compute_delay_pps_past_target_is_zero() {
447        // If we are already past the target time, delay must be zero (no negative sleep).
448        let mut first_ts = None;
449        // Backdate start by 10 s so elapsed >> target.
450        let start = Instant::now() - Duration::from_secs(10);
451        // Packet 1 at 1 000 pps: target = 1 ms, but 10 s have elapsed.
452        let d = compute_delay(&ReplaySpeed::Pps(1_000), 0, &mut first_ts, 1, &start);
453        assert_eq!(d, Duration::ZERO);
454    }
455
456    /// On Linux: verify that a nonexistent interface produces the right error.
457    /// The test passes whether it fails at socket creation (PermissionDenied —
458    /// no CAP_NET_RAW in CI) or at interface lookup (UnknownInterface).
459    #[cfg(target_os = "linux")]
460    #[test]
461    fn test_replay_unknown_interface_returns_error() {
462        use crate::filter::Filter;
463
464        let opts = ReplayOptions {
465            interfaces: vec!["nonexistent_iface_xyz999".to_owned()],
466            speed: ReplaySpeed::Max,
467            filter: Filter::default(),
468            bpf_filter: None,
469        };
470
471        // Build a minimal valid PCAP (header only, no packets).
472        let mut pcap_bytes = Vec::new();
473        pcap_bytes.extend_from_slice(&0xa1b2_c3d4u32.to_le_bytes()); // magic
474        pcap_bytes.extend_from_slice(&2u16.to_le_bytes()); // version major
475        pcap_bytes.extend_from_slice(&4u16.to_le_bytes()); // version minor
476        pcap_bytes.extend_from_slice(&0i32.to_le_bytes()); // thiszone
477        pcap_bytes.extend_from_slice(&0u32.to_le_bytes()); // sigfigs
478        pcap_bytes.extend_from_slice(&65535u32.to_le_bytes()); // snaplen
479        pcap_bytes.extend_from_slice(&1i32.to_le_bytes()); // network (Ethernet)
480
481        let path = std::env::temp_dir().join("replay_test_no_iface.pcap");
482        std::fs::write(&path, &pcap_bytes).unwrap();
483
484        let err = replay_file(&path, &opts).unwrap_err();
485        assert!(
486            matches!(
487                err,
488                ReplayError::PermissionDenied(_) | ReplayError::UnknownInterface(_)
489            ),
490            "expected PermissionDenied or UnknownInterface, got: {err}"
491        );
492    }
493
494    #[cfg(not(target_os = "linux"))]
495    #[test]
496    fn test_replay_not_supported_on_non_linux() {
497        use crate::filter::Filter;
498
499        let opts = ReplayOptions {
500            interfaces: vec!["eth0".to_owned()],
501            speed: ReplaySpeed::RealTime,
502            filter: Filter::default(),
503            bpf_filter: None,
504        };
505        let path = std::path::Path::new("/dev/null");
506        assert!(matches!(
507            replay_file(path, &opts),
508            Err(ReplayError::NotSupported)
509        ));
510    }
511}