1use std::path::Path;
21use std::time::{Duration, Instant};
22
23use crate::bpf::BpfExpr;
24use crate::error::ReplayError;
25use crate::filter::{Filter, PacketMeta};
26use crate::pcap;
27
28#[derive(Debug, Clone)]
32pub enum ReplaySpeed {
33 RealTime,
35 Multiplier(f64),
37 Max,
39 Pps(u64),
41}
42
43impl ReplaySpeed {
44 pub fn parse(s: &str) -> Option<Self> {
54 if s.eq_ignore_ascii_case("max") {
55 return Some(ReplaySpeed::Max);
56 }
57 s.parse::<f64>().ok().filter(|&f| f > 0.0).map(|f| {
58 if (f - 1.0).abs() < f64::EPSILON {
59 ReplaySpeed::RealTime
60 } else {
61 ReplaySpeed::Multiplier(f)
62 }
63 })
64 }
65}
66
67pub struct ReplayOptions {
69 pub interfaces: Vec<String>,
72 pub speed: ReplaySpeed,
74 pub filter: Filter,
76 pub bpf_filter: Option<BpfExpr>,
78}
79
80#[derive(Debug)]
82pub struct ReplayReport {
83 pub packets_sent: u64,
85 pub bytes_sent: u64,
87}
88
89pub fn replay_file(input: &Path, opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
100 platform::replay_impl(input, opts)
101}
102
103pub(crate) fn compute_delay(
113 speed: &ReplaySpeed,
114 pkt_ts_ns: u64,
115 first_ts_ns: &mut Option<u64>,
116 sent_count: u64,
117 start_time: &Instant,
118) -> Duration {
119 match speed {
120 ReplaySpeed::Max => Duration::ZERO,
121
122 ReplaySpeed::Pps(pps) => {
123 if *pps == 0 {
124 return Duration::ZERO;
125 }
126 let target_ns = sent_count * 1_000_000_000 / pps;
128 let elapsed_ns = start_time.elapsed().as_nanos() as u64;
129 if target_ns > elapsed_ns {
130 Duration::from_nanos(target_ns - elapsed_ns)
131 } else {
132 Duration::ZERO
133 }
134 }
135
136 ReplaySpeed::RealTime | ReplaySpeed::Multiplier(_) => {
137 let first = *first_ts_ns.get_or_insert(pkt_ts_ns);
139 let capture_gap_ns = pkt_ts_ns.saturating_sub(first);
140 let scaled_gap_ns = match speed {
141 ReplaySpeed::Multiplier(f) => (capture_gap_ns as f64 / f) as u64,
142 _ => capture_gap_ns,
143 };
144 let elapsed_ns = start_time.elapsed().as_nanos() as u64;
145 if scaled_gap_ns > elapsed_ns {
146 Duration::from_nanos(scaled_gap_ns - elapsed_ns)
147 } else {
148 Duration::ZERO
149 }
150 }
151 }
152}
153
154#[cfg(target_os = "linux")]
157mod platform {
158 use socket2::{Domain, Protocol, Socket, Type};
159
160 use super::*;
161
162 const AF_PACKET: i32 = 17;
164
165 const ETH_P_ALL_NBO: i32 = (0x0003_u16.to_be()) as i32;
170
171 #[repr(C)]
176 struct SockAddrLl {
177 sll_family: u16,
178 sll_protocol: u16,
179 sll_ifindex: i32,
180 sll_hatype: u16,
181 sll_pkttype: u8,
182 sll_halen: u8,
183 sll_addr: [u8; 8],
184 }
185
186 fn read_ifindex(iface: &str) -> Result<i32, ReplayError> {
191 let path = format!("/sys/class/net/{iface}/ifindex");
192 let s = std::fs::read_to_string(&path)
193 .map_err(|_| ReplayError::UnknownInterface(iface.to_owned()))?;
194 s.trim()
195 .parse::<i32>()
196 .map_err(|_| ReplayError::UnknownInterface(iface.to_owned()))
197 }
198
199 fn open_raw_socket(iface: &str) -> Result<Socket, ReplayError> {
201 let sock = Socket::new(
202 Domain::from(AF_PACKET),
203 Type::RAW,
204 Some(Protocol::from(ETH_P_ALL_NBO)),
205 )
206 .map_err(|e| {
207 if e.kind() == std::io::ErrorKind::PermissionDenied {
208 ReplayError::PermissionDenied(
209 "creating a raw AF_PACKET socket requires CAP_NET_RAW; \
210 run as root or: sudo setcap cap_net_raw+eip <binary>"
211 .to_owned(),
212 )
213 } else {
214 ReplayError::Io(e)
215 }
216 })?;
217
218 let ifindex = read_ifindex(iface)?;
219
220 let (_, addr) = unsafe {
223 socket2::SockAddr::try_init(|storage, len| {
224 let sa = &mut *storage.cast::<SockAddrLl>();
225 sa.sll_family = AF_PACKET as u16;
226 sa.sll_protocol = 0x0003_u16.to_be(); sa.sll_ifindex = ifindex;
228 sa.sll_hatype = 0;
229 sa.sll_pkttype = 0;
230 sa.sll_halen = 0;
231 sa.sll_addr = [0u8; 8];
232 *len = std::mem::size_of::<SockAddrLl>() as _;
233 Ok(())
234 })
235 }
236 .map_err(ReplayError::Io)?;
237
238 sock.bind(&addr).map_err(|e| {
239 if e.kind() == std::io::ErrorKind::PermissionDenied {
240 ReplayError::PermissionDenied("binding raw socket requires CAP_NET_RAW".to_owned())
241 } else {
242 ReplayError::Io(e)
243 }
244 })?;
245
246 Ok(sock)
247 }
248
249 pub fn replay_impl(input: &Path, opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
250 let sockets: Vec<Socket> = opts
251 .interfaces
252 .iter()
253 .map(|iface| open_raw_socket(iface))
254 .collect::<Result<_, _>>()?;
255
256 let has_filter = !opts.filter.is_empty() || opts.bpf_filter.is_some();
257
258 let iter =
259 pcap::open_with_payload(input).map_err(|e| ReplayError::PcapParse(e.to_string()))?;
260
261 let mut packets_sent: u64 = 0;
262 let mut bytes_sent: u64 = 0;
263 let mut first_ts_ns: Option<u64> = None;
264 let start_time = Instant::now();
265
266 for result in iter {
267 let pkt = result.map_err(|e| ReplayError::PcapParse(e.to_string()))?;
268
269 if has_filter {
270 let meta = PacketMeta::from_packet(
271 pkt.info.timestamp_ns,
272 pkt.info.captured_len,
273 &pkt.data,
274 );
275 let struct_pass = opts.filter.is_empty() || opts.filter.matches(&meta);
276 let bpf_pass = opts
277 .bpf_filter
278 .as_ref()
279 .map(|b| b.eval(&meta))
280 .unwrap_or(true);
281 if !struct_pass || !bpf_pass {
282 continue;
283 }
284 }
285
286 let delay = compute_delay(
287 &opts.speed,
288 pkt.info.timestamp_ns,
289 &mut first_ts_ns,
290 packets_sent,
291 &start_time,
292 );
293 if !delay.is_zero() {
294 std::thread::sleep(delay);
295 }
296
297 for sock in &sockets {
298 sock.send(&pkt.data).map_err(ReplayError::Io)?;
299 }
300 packets_sent += 1;
301 bytes_sent += pkt.data.len() as u64;
302 }
303
304 Ok(ReplayReport {
305 packets_sent,
306 bytes_sent,
307 })
308 }
309}
310
311#[cfg(not(target_os = "linux"))]
312mod platform {
313 use super::*;
314
315 pub fn replay_impl(_input: &Path, _opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
316 Err(ReplayError::NotSupported)
317 }
318}
319
320#[cfg(test)]
323mod tests {
324 use super::*;
325
326 #[test]
327 fn test_replay_speed_parse_max() {
328 assert!(matches!(ReplaySpeed::parse("max"), Some(ReplaySpeed::Max)));
329 assert!(matches!(ReplaySpeed::parse("MAX"), Some(ReplaySpeed::Max)));
330 assert!(matches!(ReplaySpeed::parse("Max"), Some(ReplaySpeed::Max)));
331 }
332
333 #[test]
334 fn test_replay_speed_parse_real_time() {
335 assert!(matches!(
336 ReplaySpeed::parse("1.0"),
337 Some(ReplaySpeed::RealTime)
338 ));
339 assert!(matches!(
340 ReplaySpeed::parse("1"),
341 Some(ReplaySpeed::RealTime)
342 ));
343 }
344
345 #[test]
346 fn test_replay_speed_parse_multiplier() {
347 assert!(matches!(
348 ReplaySpeed::parse("2.0"),
349 Some(ReplaySpeed::Multiplier(f)) if (f - 2.0).abs() < 1e-9
350 ));
351 assert!(matches!(
352 ReplaySpeed::parse("0.5"),
353 Some(ReplaySpeed::Multiplier(f)) if (f - 0.5).abs() < 1e-9
354 ));
355 assert!(matches!(
356 ReplaySpeed::parse("10"),
357 Some(ReplaySpeed::Multiplier(f)) if (f - 10.0).abs() < 1e-9
358 ));
359 }
360
361 #[test]
362 fn test_replay_speed_parse_invalid() {
363 assert!(ReplaySpeed::parse("").is_none());
364 assert!(ReplaySpeed::parse("abc").is_none());
365 assert!(ReplaySpeed::parse("-1.0").is_none());
366 assert!(ReplaySpeed::parse("0").is_none());
367 assert!(ReplaySpeed::parse("0.0").is_none());
368 }
369
370 #[test]
371 fn test_compute_delay_max_is_zero() {
372 let mut first_ts = None;
373 let start = Instant::now();
374 let d = compute_delay(&ReplaySpeed::Max, 1_000_000_000, &mut first_ts, 0, &start);
375 assert_eq!(d, Duration::ZERO);
376 }
377
378 #[test]
379 fn test_compute_delay_real_time_first_packet_is_zero() {
380 let mut first_ts = None;
381 let start = Instant::now();
382 let d = compute_delay(
383 &ReplaySpeed::RealTime,
384 1_000_000_000,
385 &mut first_ts,
386 0,
387 &start,
388 );
389 assert!(d < Duration::from_millis(10));
391 }
392
393 #[test]
394 fn test_compute_delay_pps_first_packet_is_zero() {
395 let mut first_ts = None;
396 let start = Instant::now();
397 let d = compute_delay(&ReplaySpeed::Pps(1000), 0, &mut first_ts, 0, &start);
399 assert_eq!(d, Duration::ZERO);
400 }
401
402 #[test]
403 fn test_compute_delay_pps_zero_is_safe() {
404 let mut first_ts = None;
405 let start = Instant::now();
406 let d = compute_delay(&ReplaySpeed::Pps(0), 0, &mut first_ts, 5, &start);
407 assert_eq!(d, Duration::ZERO);
408 }
409
410 #[cfg(target_os = "linux")]
414 #[test]
415 fn test_replay_unknown_interface_returns_error() {
416 use crate::filter::Filter;
417
418 let opts = ReplayOptions {
419 interfaces: vec!["nonexistent_iface_xyz999".to_owned()],
420 speed: ReplaySpeed::Max,
421 filter: Filter::default(),
422 bpf_filter: None,
423 };
424
425 let mut pcap_bytes = Vec::new();
427 pcap_bytes.extend_from_slice(&0xa1b2_c3d4u32.to_le_bytes()); pcap_bytes.extend_from_slice(&2u16.to_le_bytes()); pcap_bytes.extend_from_slice(&4u16.to_le_bytes()); pcap_bytes.extend_from_slice(&0i32.to_le_bytes()); pcap_bytes.extend_from_slice(&0u32.to_le_bytes()); pcap_bytes.extend_from_slice(&65535u32.to_le_bytes()); pcap_bytes.extend_from_slice(&1i32.to_le_bytes()); let path = std::env::temp_dir().join("replay_test_no_iface.pcap");
436 std::fs::write(&path, &pcap_bytes).unwrap();
437
438 let err = replay_file(&path, &opts).unwrap_err();
439 assert!(
440 matches!(
441 err,
442 ReplayError::PermissionDenied(_) | ReplayError::UnknownInterface(_)
443 ),
444 "expected PermissionDenied or UnknownInterface, got: {err}"
445 );
446 }
447
448 #[cfg(not(target_os = "linux"))]
449 #[test]
450 fn test_replay_not_supported_on_non_linux() {
451 use crate::filter::Filter;
452
453 let opts = ReplayOptions {
454 interfaces: vec!["eth0".to_owned()],
455 speed: ReplaySpeed::RealTime,
456 filter: Filter::default(),
457 bpf_filter: None,
458 };
459 let path = std::path::Path::new("/dev/null");
460 assert!(matches!(
461 replay_file(path, &opts),
462 Err(ReplayError::NotSupported)
463 ));
464 }
465}