1use std::path::Path;
21use std::time::{Duration, Instant};
22
23use crate::bpf::BpfExpr;
24use crate::error::ReplayError;
25use crate::filter::{Filter, PacketMeta};
26use crate::pcap;
27
28#[derive(Debug, Clone)]
32pub enum ReplaySpeed {
33 RealTime,
35 Multiplier(f64),
37 Max,
39 Pps(u64),
41}
42
43impl ReplaySpeed {
44 pub fn parse(s: &str) -> Option<Self> {
54 if s.eq_ignore_ascii_case("max") {
55 return Some(ReplaySpeed::Max);
56 }
57 s.parse::<f64>().ok().filter(|&f| f > 0.0).map(|f| {
58 if (f - 1.0).abs() < f64::EPSILON {
59 ReplaySpeed::RealTime
60 } else {
61 ReplaySpeed::Multiplier(f)
62 }
63 })
64 }
65}
66
67pub struct ReplayOptions {
69 pub interfaces: Vec<String>,
72 pub speed: ReplaySpeed,
74 pub filter: Filter,
76 pub bpf_filter: Option<BpfExpr>,
78}
79
80#[derive(Debug)]
82pub struct ReplayReport {
83 pub packets_sent: u64,
85 pub bytes_sent: u64,
87}
88
89pub fn replay_file(input: &Path, opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
100 platform::replay_impl(input, opts)
101}
102
103pub(crate) fn compute_delay(
113 speed: &ReplaySpeed,
114 pkt_ts_ns: u64,
115 first_ts_ns: &mut Option<u64>,
116 sent_count: u64,
117 start_time: &Instant,
118) -> Duration {
119 match speed {
120 ReplaySpeed::Max => Duration::ZERO,
121
122 ReplaySpeed::Pps(pps) => {
123 if *pps == 0 {
124 return Duration::ZERO;
125 }
126 let target_ns = (sent_count as u128 * 1_000_000_000 / *pps as u128) as u64;
130 let elapsed_ns = start_time.elapsed().as_nanos() as u64;
131 if target_ns > elapsed_ns {
132 Duration::from_nanos(target_ns - elapsed_ns)
133 } else {
134 Duration::ZERO
135 }
136 }
137
138 ReplaySpeed::RealTime | ReplaySpeed::Multiplier(_) => {
139 let first = *first_ts_ns.get_or_insert(pkt_ts_ns);
141 let capture_gap_ns = pkt_ts_ns.saturating_sub(first);
142 let scaled_gap_ns = match speed {
143 ReplaySpeed::Multiplier(f) => (capture_gap_ns as f64 / f) as u64,
144 _ => capture_gap_ns,
145 };
146 let elapsed_ns = start_time.elapsed().as_nanos() as u64;
147 if scaled_gap_ns > elapsed_ns {
148 Duration::from_nanos(scaled_gap_ns - elapsed_ns)
149 } else {
150 Duration::ZERO
151 }
152 }
153 }
154}
155
156#[cfg(target_os = "linux")]
159mod platform {
160 use socket2::{Domain, Protocol, Socket, Type};
161
162 use super::*;
163
164 const AF_PACKET: i32 = 17;
166
167 const ETH_P_ALL_NBO: i32 = (0x0003_u16.to_be()) as i32;
172
173 #[repr(C)]
178 struct SockAddrLl {
179 sll_family: u16,
180 sll_protocol: u16,
181 sll_ifindex: i32,
182 sll_hatype: u16,
183 sll_pkttype: u8,
184 sll_halen: u8,
185 sll_addr: [u8; 8],
186 }
187
188 fn read_ifindex(iface: &str) -> Result<i32, ReplayError> {
193 let path = format!("/sys/class/net/{iface}/ifindex");
194 let s = std::fs::read_to_string(&path)
195 .map_err(|_| ReplayError::UnknownInterface(iface.to_owned()))?;
196 s.trim()
197 .parse::<i32>()
198 .map_err(|_| ReplayError::UnknownInterface(iface.to_owned()))
199 }
200
201 fn open_raw_socket(iface: &str) -> Result<Socket, ReplayError> {
203 let sock = Socket::new(
204 Domain::from(AF_PACKET),
205 Type::RAW,
206 Some(Protocol::from(ETH_P_ALL_NBO)),
207 )
208 .map_err(|e| {
209 if e.kind() == std::io::ErrorKind::PermissionDenied {
210 ReplayError::PermissionDenied(
211 "creating a raw AF_PACKET socket requires CAP_NET_RAW; \
212 run as root or: sudo setcap cap_net_raw+eip <binary>"
213 .to_owned(),
214 )
215 } else {
216 ReplayError::Io(e)
217 }
218 })?;
219
220 let ifindex = read_ifindex(iface)?;
221
222 let (_, addr) = unsafe {
225 socket2::SockAddr::try_init(|storage, len| {
226 let sa = &mut *storage.cast::<SockAddrLl>();
227 sa.sll_family = AF_PACKET as u16;
228 sa.sll_protocol = 0x0003_u16.to_be(); sa.sll_ifindex = ifindex;
230 sa.sll_hatype = 0;
231 sa.sll_pkttype = 0;
232 sa.sll_halen = 0;
233 sa.sll_addr = [0u8; 8];
234 *len = std::mem::size_of::<SockAddrLl>() as _;
235 Ok(())
236 })
237 }
238 .map_err(ReplayError::Io)?;
239
240 sock.bind(&addr).map_err(|e| {
241 if e.kind() == std::io::ErrorKind::PermissionDenied {
242 ReplayError::PermissionDenied("binding raw socket requires CAP_NET_RAW".to_owned())
243 } else {
244 ReplayError::Io(e)
245 }
246 })?;
247
248 Ok(sock)
249 }
250
251 pub fn replay_impl(input: &Path, opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
252 let sockets: Vec<Socket> = opts
253 .interfaces
254 .iter()
255 .map(|iface| open_raw_socket(iface))
256 .collect::<Result<_, _>>()?;
257
258 let has_filter = !opts.filter.is_empty() || opts.bpf_filter.is_some();
259
260 let iter =
261 pcap::open_with_payload(input).map_err(|e| ReplayError::PcapParse(e.to_string()))?;
262
263 let mut packets_sent: u64 = 0;
264 let mut bytes_sent: u64 = 0;
265 let mut first_ts_ns: Option<u64> = None;
266 let start_time = Instant::now();
267
268 for result in iter {
269 let pkt = result.map_err(|e| ReplayError::PcapParse(e.to_string()))?;
270
271 if has_filter {
272 let meta = PacketMeta::from_packet(
273 pkt.info.timestamp_ns,
274 pkt.info.captured_len,
275 &pkt.data,
276 );
277 let struct_pass = opts.filter.is_empty() || opts.filter.matches(&meta);
278 let bpf_pass = opts
279 .bpf_filter
280 .as_ref()
281 .map(|b| b.eval(&meta))
282 .unwrap_or(true);
283 if !struct_pass || !bpf_pass {
284 continue;
285 }
286 }
287
288 let delay = compute_delay(
289 &opts.speed,
290 pkt.info.timestamp_ns,
291 &mut first_ts_ns,
292 packets_sent,
293 &start_time,
294 );
295 if !delay.is_zero() {
296 std::thread::sleep(delay);
297 }
298
299 for sock in &sockets {
300 sock.send(&pkt.data).map_err(ReplayError::Io)?;
301 }
302 packets_sent += 1;
303 bytes_sent += pkt.data.len() as u64;
304 }
305
306 Ok(ReplayReport {
307 packets_sent,
308 bytes_sent,
309 })
310 }
311}
312
313#[cfg(not(target_os = "linux"))]
314mod platform {
315 use super::*;
316
317 pub fn replay_impl(_input: &Path, _opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
318 Err(ReplayError::NotSupported)
319 }
320}
321
322#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_replay_speed_parse_max() {
330 assert!(matches!(ReplaySpeed::parse("max"), Some(ReplaySpeed::Max)));
331 assert!(matches!(ReplaySpeed::parse("MAX"), Some(ReplaySpeed::Max)));
332 assert!(matches!(ReplaySpeed::parse("Max"), Some(ReplaySpeed::Max)));
333 }
334
335 #[test]
336 fn test_replay_speed_parse_real_time() {
337 assert!(matches!(
338 ReplaySpeed::parse("1.0"),
339 Some(ReplaySpeed::RealTime)
340 ));
341 assert!(matches!(
342 ReplaySpeed::parse("1"),
343 Some(ReplaySpeed::RealTime)
344 ));
345 }
346
347 #[test]
348 fn test_replay_speed_parse_multiplier() {
349 assert!(matches!(
350 ReplaySpeed::parse("2.0"),
351 Some(ReplaySpeed::Multiplier(f)) if (f - 2.0).abs() < 1e-9
352 ));
353 assert!(matches!(
354 ReplaySpeed::parse("0.5"),
355 Some(ReplaySpeed::Multiplier(f)) if (f - 0.5).abs() < 1e-9
356 ));
357 assert!(matches!(
358 ReplaySpeed::parse("10"),
359 Some(ReplaySpeed::Multiplier(f)) if (f - 10.0).abs() < 1e-9
360 ));
361 }
362
363 #[test]
364 fn test_replay_speed_parse_invalid() {
365 assert!(ReplaySpeed::parse("").is_none());
366 assert!(ReplaySpeed::parse("abc").is_none());
367 assert!(ReplaySpeed::parse("-1.0").is_none());
368 assert!(ReplaySpeed::parse("0").is_none());
369 assert!(ReplaySpeed::parse("0.0").is_none());
370 }
371
372 #[test]
373 fn test_compute_delay_max_is_zero() {
374 let mut first_ts = None;
375 let start = Instant::now();
376 let d = compute_delay(&ReplaySpeed::Max, 1_000_000_000, &mut first_ts, 0, &start);
377 assert_eq!(d, Duration::ZERO);
378 }
379
380 #[test]
381 fn test_compute_delay_real_time_first_packet_is_zero() {
382 let mut first_ts = None;
383 let start = Instant::now();
384 let d = compute_delay(
385 &ReplaySpeed::RealTime,
386 1_000_000_000,
387 &mut first_ts,
388 0,
389 &start,
390 );
391 assert!(d < Duration::from_millis(10));
393 }
394
395 #[test]
396 fn test_compute_delay_pps_first_packet_is_zero() {
397 let mut first_ts = None;
398 let start = Instant::now();
399 let d = compute_delay(&ReplaySpeed::Pps(1000), 0, &mut first_ts, 0, &start);
401 assert_eq!(d, Duration::ZERO);
402 }
403
404 #[test]
405 fn test_compute_delay_pps_zero_is_safe() {
406 let mut first_ts = None;
407 let start = Instant::now();
408 let d = compute_delay(&ReplaySpeed::Pps(0), 0, &mut first_ts, 5, &start);
409 assert_eq!(d, Duration::ZERO);
410 }
411
412 #[test]
413 fn test_compute_delay_pps_spacing() {
414 let mut first_ts = None;
418 let start = Instant::now();
419 let d = compute_delay(&ReplaySpeed::Pps(1_000), 0, &mut first_ts, 1, &start);
421 assert!(
422 d >= Duration::from_micros(990) && d <= Duration::from_millis(2),
423 "expected ~1 ms delay, got {d:?}"
424 );
425 }
426
427 #[test]
428 fn test_compute_delay_pps_no_overflow_at_high_count() {
429 let pps: u64 = 10_000_000_000; let sent_count: u64 = 20_000_000_000; let mut first_ts = None;
435 let start = Instant::now();
436 let d = compute_delay(&ReplaySpeed::Pps(pps), 0, &mut first_ts, sent_count, &start);
437 assert!(
440 d >= Duration::from_millis(1_990),
441 "delay wrapped or saturated: got {d:?}"
442 );
443 }
444
445 #[test]
446 fn test_compute_delay_pps_past_target_is_zero() {
447 let mut first_ts = None;
449 let start = Instant::now() - Duration::from_secs(10);
451 let d = compute_delay(&ReplaySpeed::Pps(1_000), 0, &mut first_ts, 1, &start);
453 assert_eq!(d, Duration::ZERO);
454 }
455
456 #[cfg(target_os = "linux")]
460 #[test]
461 fn test_replay_unknown_interface_returns_error() {
462 use crate::filter::Filter;
463
464 let opts = ReplayOptions {
465 interfaces: vec!["nonexistent_iface_xyz999".to_owned()],
466 speed: ReplaySpeed::Max,
467 filter: Filter::default(),
468 bpf_filter: None,
469 };
470
471 let mut pcap_bytes = Vec::new();
473 pcap_bytes.extend_from_slice(&0xa1b2_c3d4u32.to_le_bytes()); pcap_bytes.extend_from_slice(&2u16.to_le_bytes()); pcap_bytes.extend_from_slice(&4u16.to_le_bytes()); pcap_bytes.extend_from_slice(&0i32.to_le_bytes()); pcap_bytes.extend_from_slice(&0u32.to_le_bytes()); pcap_bytes.extend_from_slice(&65535u32.to_le_bytes()); pcap_bytes.extend_from_slice(&1i32.to_le_bytes()); let path = std::env::temp_dir().join("replay_test_no_iface.pcap");
482 std::fs::write(&path, &pcap_bytes).unwrap();
483
484 let err = replay_file(&path, &opts).unwrap_err();
485 assert!(
486 matches!(
487 err,
488 ReplayError::PermissionDenied(_) | ReplayError::UnknownInterface(_)
489 ),
490 "expected PermissionDenied or UnknownInterface, got: {err}"
491 );
492 }
493
494 #[cfg(not(target_os = "linux"))]
495 #[test]
496 fn test_replay_not_supported_on_non_linux() {
497 use crate::filter::Filter;
498
499 let opts = ReplayOptions {
500 interfaces: vec!["eth0".to_owned()],
501 speed: ReplaySpeed::RealTime,
502 filter: Filter::default(),
503 bpf_filter: None,
504 };
505 let path = std::path::Path::new("/dev/null");
506 assert!(matches!(
507 replay_file(path, &opts),
508 Err(ReplayError::NotSupported)
509 ));
510 }
511}