use std::path::Path;
use std::time::{Duration, Instant};
use crate::bpf::BpfExpr;
use crate::error::ReplayError;
use crate::filter::{Filter, PacketMeta};
use crate::pcap;
#[derive(Debug, Clone)]
pub enum ReplaySpeed {
RealTime,
Multiplier(f64),
Max,
Pps(u64),
}
impl ReplaySpeed {
pub fn parse(s: &str) -> Option<Self> {
if s.eq_ignore_ascii_case("max") {
return Some(ReplaySpeed::Max);
}
s.parse::<f64>().ok().filter(|&f| f > 0.0).map(|f| {
if (f - 1.0).abs() < f64::EPSILON {
ReplaySpeed::RealTime
} else {
ReplaySpeed::Multiplier(f)
}
})
}
}
pub struct ReplayOptions {
pub interfaces: Vec<String>,
pub speed: ReplaySpeed,
pub filter: Filter,
pub bpf_filter: Option<BpfExpr>,
}
#[derive(Debug)]
pub struct ReplayReport {
pub packets_sent: u64,
pub bytes_sent: u64,
}
pub fn replay_file(input: &Path, opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
platform::replay_impl(input, opts)
}
pub(crate) fn compute_delay(
speed: &ReplaySpeed,
pkt_ts_ns: u64,
first_ts_ns: &mut Option<u64>,
sent_count: u64,
start_time: &Instant,
) -> Duration {
match speed {
ReplaySpeed::Max => Duration::ZERO,
ReplaySpeed::Pps(pps) => {
if *pps == 0 {
return Duration::ZERO;
}
let target_ns = (sent_count as u128 * 1_000_000_000 / *pps as u128) as u64;
let elapsed_ns = start_time.elapsed().as_nanos() as u64;
if target_ns > elapsed_ns {
Duration::from_nanos(target_ns - elapsed_ns)
} else {
Duration::ZERO
}
}
ReplaySpeed::RealTime | ReplaySpeed::Multiplier(_) => {
let first = *first_ts_ns.get_or_insert(pkt_ts_ns);
let capture_gap_ns = pkt_ts_ns.saturating_sub(first);
let scaled_gap_ns = match speed {
ReplaySpeed::Multiplier(f) => (capture_gap_ns as f64 / f) as u64,
_ => capture_gap_ns,
};
let elapsed_ns = start_time.elapsed().as_nanos() as u64;
if scaled_gap_ns > elapsed_ns {
Duration::from_nanos(scaled_gap_ns - elapsed_ns)
} else {
Duration::ZERO
}
}
}
}
#[cfg(target_os = "linux")]
mod platform {
use socket2::{Domain, Protocol, Socket, Type};
use super::*;
const AF_PACKET: i32 = 17;
const ETH_P_ALL_NBO: i32 = (0x0003_u16.to_be()) as i32;
#[repr(C)]
struct SockAddrLl {
sll_family: u16,
sll_protocol: u16,
sll_ifindex: i32,
sll_hatype: u16,
sll_pkttype: u8,
sll_halen: u8,
sll_addr: [u8; 8],
}
fn read_ifindex(iface: &str) -> Result<i32, ReplayError> {
let path = format!("/sys/class/net/{iface}/ifindex");
let s = std::fs::read_to_string(&path)
.map_err(|_| ReplayError::UnknownInterface(iface.to_owned()))?;
s.trim()
.parse::<i32>()
.map_err(|_| ReplayError::UnknownInterface(iface.to_owned()))
}
fn open_raw_socket(iface: &str) -> Result<Socket, ReplayError> {
let sock = Socket::new(
Domain::from(AF_PACKET),
Type::RAW,
Some(Protocol::from(ETH_P_ALL_NBO)),
)
.map_err(|e| {
if e.kind() == std::io::ErrorKind::PermissionDenied {
ReplayError::PermissionDenied(
"creating a raw AF_PACKET socket requires CAP_NET_RAW; \
run as root or: sudo setcap cap_net_raw+eip <binary>"
.to_owned(),
)
} else {
ReplayError::Io(e)
}
})?;
let ifindex = read_ifindex(iface)?;
let (_, addr) = unsafe {
socket2::SockAddr::try_init(|storage, len| {
let sa = &mut *storage.cast::<SockAddrLl>();
sa.sll_family = AF_PACKET as u16;
sa.sll_protocol = 0x0003_u16.to_be(); sa.sll_ifindex = ifindex;
sa.sll_hatype = 0;
sa.sll_pkttype = 0;
sa.sll_halen = 0;
sa.sll_addr = [0u8; 8];
*len = std::mem::size_of::<SockAddrLl>() as _;
Ok(())
})
}
.map_err(ReplayError::Io)?;
sock.bind(&addr).map_err(|e| {
if e.kind() == std::io::ErrorKind::PermissionDenied {
ReplayError::PermissionDenied("binding raw socket requires CAP_NET_RAW".to_owned())
} else {
ReplayError::Io(e)
}
})?;
Ok(sock)
}
pub fn replay_impl(input: &Path, opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
let sockets: Vec<Socket> = opts
.interfaces
.iter()
.map(|iface| open_raw_socket(iface))
.collect::<Result<_, _>>()?;
let has_filter = !opts.filter.is_empty() || opts.bpf_filter.is_some();
let iter =
pcap::open_with_payload(input).map_err(|e| ReplayError::PcapParse(e.to_string()))?;
let mut packets_sent: u64 = 0;
let mut bytes_sent: u64 = 0;
let mut first_ts_ns: Option<u64> = None;
let start_time = Instant::now();
for result in iter {
let pkt = result.map_err(|e| ReplayError::PcapParse(e.to_string()))?;
if has_filter {
let meta = PacketMeta::from_packet(
pkt.info.timestamp_ns,
pkt.info.captured_len,
&pkt.data,
);
let struct_pass = opts.filter.is_empty() || opts.filter.matches(&meta);
let bpf_pass = opts
.bpf_filter
.as_ref()
.map(|b| b.eval(&meta))
.unwrap_or(true);
if !struct_pass || !bpf_pass {
continue;
}
}
let delay = compute_delay(
&opts.speed,
pkt.info.timestamp_ns,
&mut first_ts_ns,
packets_sent,
&start_time,
);
if !delay.is_zero() {
std::thread::sleep(delay);
}
for sock in &sockets {
sock.send(&pkt.data).map_err(ReplayError::Io)?;
}
packets_sent += 1;
bytes_sent += pkt.data.len() as u64;
}
Ok(ReplayReport {
packets_sent,
bytes_sent,
})
}
}
#[cfg(not(target_os = "linux"))]
mod platform {
use super::*;
pub fn replay_impl(_input: &Path, _opts: &ReplayOptions) -> Result<ReplayReport, ReplayError> {
Err(ReplayError::NotSupported)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replay_speed_parse_max() {
assert!(matches!(ReplaySpeed::parse("max"), Some(ReplaySpeed::Max)));
assert!(matches!(ReplaySpeed::parse("MAX"), Some(ReplaySpeed::Max)));
assert!(matches!(ReplaySpeed::parse("Max"), Some(ReplaySpeed::Max)));
}
#[test]
fn test_replay_speed_parse_real_time() {
assert!(matches!(
ReplaySpeed::parse("1.0"),
Some(ReplaySpeed::RealTime)
));
assert!(matches!(
ReplaySpeed::parse("1"),
Some(ReplaySpeed::RealTime)
));
}
#[test]
fn test_replay_speed_parse_multiplier() {
assert!(matches!(
ReplaySpeed::parse("2.0"),
Some(ReplaySpeed::Multiplier(f)) if (f - 2.0).abs() < 1e-9
));
assert!(matches!(
ReplaySpeed::parse("0.5"),
Some(ReplaySpeed::Multiplier(f)) if (f - 0.5).abs() < 1e-9
));
assert!(matches!(
ReplaySpeed::parse("10"),
Some(ReplaySpeed::Multiplier(f)) if (f - 10.0).abs() < 1e-9
));
}
#[test]
fn test_replay_speed_parse_invalid() {
assert!(ReplaySpeed::parse("").is_none());
assert!(ReplaySpeed::parse("abc").is_none());
assert!(ReplaySpeed::parse("-1.0").is_none());
assert!(ReplaySpeed::parse("0").is_none());
assert!(ReplaySpeed::parse("0.0").is_none());
}
#[test]
fn test_compute_delay_max_is_zero() {
let mut first_ts = None;
let start = Instant::now();
let d = compute_delay(&ReplaySpeed::Max, 1_000_000_000, &mut first_ts, 0, &start);
assert_eq!(d, Duration::ZERO);
}
#[test]
fn test_compute_delay_real_time_first_packet_is_zero() {
let mut first_ts = None;
let start = Instant::now();
let d = compute_delay(
&ReplaySpeed::RealTime,
1_000_000_000,
&mut first_ts,
0,
&start,
);
assert!(d < Duration::from_millis(10));
}
#[test]
fn test_compute_delay_pps_first_packet_is_zero() {
let mut first_ts = None;
let start = Instant::now();
let d = compute_delay(&ReplaySpeed::Pps(1000), 0, &mut first_ts, 0, &start);
assert_eq!(d, Duration::ZERO);
}
#[test]
fn test_compute_delay_pps_zero_is_safe() {
let mut first_ts = None;
let start = Instant::now();
let d = compute_delay(&ReplaySpeed::Pps(0), 0, &mut first_ts, 5, &start);
assert_eq!(d, Duration::ZERO);
}
#[test]
fn test_compute_delay_pps_spacing() {
let mut first_ts = None;
let start = Instant::now();
let d = compute_delay(&ReplaySpeed::Pps(1_000), 0, &mut first_ts, 1, &start);
assert!(
d >= Duration::from_micros(990) && d <= Duration::from_millis(2),
"expected ~1 ms delay, got {d:?}"
);
}
#[test]
fn test_compute_delay_pps_no_overflow_at_high_count() {
let pps: u64 = 10_000_000_000; let sent_count: u64 = 20_000_000_000; let mut first_ts = None;
let start = Instant::now();
let d = compute_delay(&ReplaySpeed::Pps(pps), 0, &mut first_ts, sent_count, &start);
assert!(
d >= Duration::from_millis(1_990),
"delay wrapped or saturated: got {d:?}"
);
}
#[test]
fn test_compute_delay_pps_past_target_is_zero() {
let mut first_ts = None;
let start = Instant::now() - Duration::from_secs(10);
let d = compute_delay(&ReplaySpeed::Pps(1_000), 0, &mut first_ts, 1, &start);
assert_eq!(d, Duration::ZERO);
}
#[cfg(target_os = "linux")]
#[test]
fn test_replay_unknown_interface_returns_error() {
use crate::filter::Filter;
let opts = ReplayOptions {
interfaces: vec!["nonexistent_iface_xyz999".to_owned()],
speed: ReplaySpeed::Max,
filter: Filter::default(),
bpf_filter: None,
};
let mut pcap_bytes = Vec::new();
pcap_bytes.extend_from_slice(&0xa1b2_c3d4u32.to_le_bytes()); pcap_bytes.extend_from_slice(&2u16.to_le_bytes()); pcap_bytes.extend_from_slice(&4u16.to_le_bytes()); pcap_bytes.extend_from_slice(&0i32.to_le_bytes()); pcap_bytes.extend_from_slice(&0u32.to_le_bytes()); pcap_bytes.extend_from_slice(&65535u32.to_le_bytes()); pcap_bytes.extend_from_slice(&1i32.to_le_bytes());
let path = std::env::temp_dir().join("replay_test_no_iface.pcap");
std::fs::write(&path, &pcap_bytes).unwrap();
let err = replay_file(&path, &opts).unwrap_err();
assert!(
matches!(
err,
ReplayError::PermissionDenied(_) | ReplayError::UnknownInterface(_)
),
"expected PermissionDenied or UnknownInterface, got: {err}"
);
}
#[cfg(not(target_os = "linux"))]
#[test]
fn test_replay_not_supported_on_non_linux() {
use crate::filter::Filter;
let opts = ReplayOptions {
interfaces: vec!["eth0".to_owned()],
speed: ReplaySpeed::RealTime,
filter: Filter::default(),
bpf_filter: None,
};
let path = std::path::Path::new("/dev/null");
assert!(matches!(
replay_file(path, &opts),
Err(ReplayError::NotSupported)
));
}
}