use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, Ordering};
use crate::afpacket::{ffi, ring::MmapRing, socket};
use crate::error::Error;
pub struct TxSlot<'a> {
frame_ptr: NonNull<u8>,
data_offset: usize,
max_len: usize,
len: usize,
sent: bool,
pending: &'a mut u32,
}
impl<'a> TxSlot<'a> {
pub fn data_mut(&mut self) -> &mut [u8] {
let ptr = self.frame_ptr.as_ptr().map_addr(|a| a + self.data_offset);
unsafe { std::slice::from_raw_parts_mut(ptr, self.max_len) }
}
pub fn set_len(&mut self, len: usize) {
assert!(
len <= self.max_len,
"packet length {len} exceeds frame capacity {}",
self.max_len
);
self.len = len;
}
pub fn send(mut self) {
let hdr = self.frame_ptr.as_ptr().cast::<libc::tpacket_hdr>();
unsafe {
(*hdr).tp_len = self.len as u32;
(*hdr).tp_snaplen = self.len as u32;
(*hdr).tp_mac = self.data_offset as u16;
(*hdr).tp_net = self.data_offset as u16;
}
let status_ptr = hdr.cast::<AtomicU32>();
unsafe { &*status_ptr }.store(ffi::TP_STATUS_SEND_REQUEST, Ordering::Release);
self.sent = true;
*self.pending += 1;
}
}
impl Drop for TxSlot<'_> {
fn drop(&mut self) {
if !self.sent {
let status_ptr = self.frame_ptr.as_ptr().cast::<AtomicU32>();
unsafe { &*status_ptr }.store(ffi::TP_STATUS_AVAILABLE, Ordering::Release);
}
}
}
pub struct Injector {
ring: MmapRing,
fd: OwnedFd,
current_frame: usize,
frame_count: usize,
frame_size: usize,
data_offset: usize,
pending: u32,
}
impl std::fmt::Debug for Injector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Injector")
.field("frame_count", &self.frame_count)
.field("frame_size", &self.frame_size)
.field("pending", &self.pending)
.finish()
}
}
impl Injector {
pub fn open(interface: &str) -> Result<Self, Error> {
Self::builder().interface(interface).build()
}
pub fn builder() -> InjectorBuilder {
InjectorBuilder::default()
}
fn frame_ptr(&self, index: usize) -> NonNull<u8> {
let offset = index * self.frame_size;
let ptr = self.ring.base().as_ptr().map_addr(|a| a + offset);
unsafe { NonNull::new_unchecked(ptr) }
}
fn read_frame_status(&self, index: usize) -> u32 {
let ptr = self.frame_ptr(index).as_ptr().cast::<AtomicU32>();
unsafe { &*ptr }.load(Ordering::Acquire)
}
#[inline]
pub fn frame_capacity(&self) -> usize {
self.frame_size - self.data_offset
}
#[inline]
pub fn frame_count(&self) -> usize {
self.frame_count
}
pub fn available_slots(&self) -> usize {
(0..self.frame_count)
.filter(|&i| self.read_frame_status(i) == ffi::TP_STATUS_AVAILABLE)
.count()
}
pub fn rejected_slots(&self) -> usize {
(0..self.frame_count)
.filter(|&i| self.read_frame_status(i) == ffi::TP_STATUS_WRONG_FORMAT)
.count()
}
pub fn pending_count(&self) -> usize {
(0..self.frame_count)
.filter(|&i| {
let s = self.read_frame_status(i);
s == ffi::TP_STATUS_SEND_REQUEST || s == ffi::TP_STATUS_SENDING
})
.count()
}
pub fn wait_drained(&mut self, timeout: std::time::Duration) -> Result<(), Error> {
use std::time::Instant;
let deadline = Instant::now() + timeout;
loop {
if self.pending_count() == 0 {
return Ok(());
}
let remaining = match deadline.checked_duration_since(Instant::now()) {
Some(r) => r,
None => {
return Err(Error::Io(std::io::Error::from(
std::io::ErrorKind::TimedOut,
)));
}
};
let slice = remaining.min(std::time::Duration::from_millis(10));
let mut pfds = [nix::poll::PollFd::new(
self.fd.as_fd(),
nix::poll::PollFlags::POLLOUT,
)];
crate::syscall::poll_eintr_safe(&mut pfds, slice).map_err(Error::Io)?;
}
}
pub fn allocate(&mut self, len: usize) -> Option<TxSlot<'_>> {
if len > self.frame_size - self.data_offset {
return None; }
let mut wrong_format_seen = 0u32;
for _ in 0..self.frame_count {
let status = self.read_frame_status(self.current_frame);
match status {
ffi::TP_STATUS_AVAILABLE => {
let slot = TxSlot {
frame_ptr: self.frame_ptr(self.current_frame),
data_offset: self.data_offset,
max_len: self.frame_size - self.data_offset,
len: 0,
sent: false,
pending: &mut self.pending,
};
self.current_frame = (self.current_frame + 1) % self.frame_count;
return Some(slot);
}
ffi::TP_STATUS_WRONG_FORMAT => {
self.reset_slot(self.current_frame);
wrong_format_seen += 1;
self.current_frame = (self.current_frame + 1) % self.frame_count;
}
_ => {
self.current_frame = (self.current_frame + 1) % self.frame_count;
}
}
}
if wrong_format_seen > 0 {
tracing::warn!(
count = wrong_format_seen,
"AF_PACKET TX: kernel rejected frames (WRONG_FORMAT) — check packet contents"
);
}
None
}
fn reset_slot(&self, idx: usize) {
let ptr = self.frame_ptr(idx).as_ptr().cast::<AtomicU32>();
unsafe { &*ptr }.store(ffi::TP_STATUS_AVAILABLE, Ordering::Release);
}
pub fn flush(&mut self) -> Result<usize, Error> {
if self.pending == 0 {
return Ok(0);
}
crate::syscall::sendto_kick_eintr_safe(self.fd.as_raw_fd(), 0).map_err(Error::Io)?;
let count = self.pending as usize;
self.pending = 0;
Ok(count)
}
}
impl AsFd for Injector {
fn as_fd(&self) -> BorrowedFd<'_> {
self.fd.as_fd()
}
}
impl AsRawFd for Injector {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.fd.as_raw_fd()
}
}
impl crate::traits::PacketSink for Injector {
fn allocate(&mut self, len: usize) -> Option<TxSlot<'_>> {
Injector::allocate(self, len)
}
fn flush(&mut self) -> Result<usize, Error> {
Injector::flush(self)
}
}
impl Drop for Injector {
fn drop(&mut self) {
if let Err(e) = self.flush() {
tracing::warn!(error = %e, "Injector::drop final flush failed");
}
}
}
unsafe impl Send for Injector {}
#[must_use]
pub struct InjectorBuilder {
interface: Option<String>,
frame_size: usize,
frame_count: usize,
qdisc_bypass: bool,
}
impl Default for InjectorBuilder {
fn default() -> Self {
Self {
interface: None,
frame_size: 2048,
frame_count: 256,
qdisc_bypass: false,
}
}
}
impl InjectorBuilder {
pub fn interface(mut self, name: &str) -> Self {
self.interface = Some(name.to_string());
self
}
pub fn frame_size(mut self, bytes: usize) -> Self {
self.frame_size = bytes;
self
}
pub fn frame_count(mut self, n: usize) -> Self {
self.frame_count = n;
self
}
pub fn qdisc_bypass(mut self, enable: bool) -> Self {
self.qdisc_bypass = enable;
self
}
pub fn build(self) -> Result<Injector, Error> {
let interface = self
.interface
.ok_or_else(|| Error::Config("interface is required".into()))?;
crate::afpacket::validate_frame_size(self.frame_size)?;
if self.frame_count == 0 {
return Err(Error::Config("frame_count must be > 0".into()));
}
let page_size = 4096usize;
let block_size = self.frame_size.max(page_size).next_power_of_two();
let frames_per_block = block_size / self.frame_size;
let block_count = self.frame_count.div_ceil(frames_per_block);
let actual_frame_count = block_count * frames_per_block;
let mut req: ffi::tpacket_req3 = unsafe { std::mem::zeroed() };
req.tp_block_size = block_size as u32;
req.tp_block_nr = block_count as u32;
req.tp_frame_size = self.frame_size as u32;
req.tp_frame_nr = actual_frame_count as u32;
let fd = socket::create_packet_socket()?;
socket::set_packet_version(fd.as_fd())?;
socket::set_tx_ring(fd.as_fd(), &req)?;
let ring_size = block_size * block_count;
let ring = MmapRing::new(fd.as_fd(), ring_size, block_size, block_count)?;
let ifindex = socket::resolve_interface(&interface)?;
socket::bind_to_interface(fd.as_fd(), ifindex)?;
if self.qdisc_bypass {
let val: libc::c_int = 1;
crate::sockopt::raw_setsockopt(
fd.as_fd(),
ffi::SOL_PACKET,
ffi::PACKET_QDISC_BYPASS,
&val,
"PACKET_QDISC_BYPASS",
)?;
}
let data_offset = ffi::tpacket_align(std::mem::size_of::<libc::tpacket_hdr>());
Ok(Injector {
ring,
fd,
current_frame: 0,
frame_count: actual_frame_count,
frame_size: self.frame_size,
data_offset,
pending: 0,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_rejects_missing_interface() {
let err = InjectorBuilder::default().build().unwrap_err();
assert!(matches!(err, Error::Config(_)));
}
#[test]
fn builder_rejects_bad_frame_size() {
let err = InjectorBuilder::default()
.interface("lo")
.frame_size(100) .build()
.unwrap_err();
assert!(matches!(err, Error::Config(_)));
}
#[test]
fn builder_rejects_zero_frame_count() {
let err = InjectorBuilder::default()
.interface("lo")
.frame_count(0)
.build()
.unwrap_err();
assert!(matches!(err, Error::Config(_)));
}
#[test]
fn builder_defaults() {
let b = InjectorBuilder::default();
assert_eq!(b.frame_size, 2048);
assert_eq!(b.frame_count, 256);
assert!(!b.qdisc_bypass);
}
#[test]
fn frame_capacity_arithmetic() {
let hdr_aligned = ffi::tpacket_align(std::mem::size_of::<libc::tpacket_hdr>());
let capacity = 2048 - hdr_aligned;
assert!(capacity > 1500);
assert!(capacity < 2048);
}
}