#[cfg(feature = "af-xdp")]
mod batch;
pub(crate) mod ffi;
#[cfg(feature = "af-xdp")]
mod ring;
#[cfg(feature = "af-xdp")]
mod socket;
mod stats;
#[cfg(feature = "af-xdp")]
mod umem;
#[cfg(feature = "af-xdp")]
pub use batch::{XdpBatch, XdpBatchIter, XdpPacket};
pub use stats::XdpStats;
use std::os::fd::{AsFd, AsRawFd};
#[cfg(feature = "af-xdp")]
use std::os::fd::{BorrowedFd, OwnedFd};
#[cfg(feature = "af-xdp")]
use std::time::Duration;
#[cfg(feature = "af-xdp")]
use ring::{CompletionRing, FillRing, RxRing, TxRing};
#[cfg(feature = "af-xdp")]
use umem::Umem;
use crate::error::Error;
#[cfg(feature = "af-xdp")]
use crate::packet::{OwnedPacket, Timestamp};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum XdpMode {
Rx,
Tx,
#[default]
RxTx,
Custom {
prefill: u32,
},
}
#[derive(Debug, Clone)]
#[must_use]
pub struct XdpSocketBuilder {
interface: Option<String>,
queue_id: u32,
frame_size: usize,
frame_count: usize,
need_wakeup: bool,
mode: XdpMode,
shared_umem_fd: u32,
}
impl Default for XdpSocketBuilder {
fn default() -> Self {
Self {
interface: None,
queue_id: 0,
frame_size: 4096,
frame_count: 4096,
need_wakeup: true,
mode: XdpMode::default(),
shared_umem_fd: 0,
}
}
}
impl XdpSocketBuilder {
pub fn interface(mut self, name: &str) -> Self {
self.interface = Some(name.to_string());
self
}
pub fn queue_id(mut self, id: u32) -> Self {
self.queue_id = id;
self
}
pub fn frame_size(mut self, size: usize) -> Self {
self.frame_size = size;
self
}
pub fn frame_count(mut self, count: usize) -> Self {
self.frame_count = count;
self
}
pub fn mode(mut self, mode: XdpMode) -> Self {
self.mode = mode;
self
}
pub fn need_wakeup(mut self, enable: bool) -> Self {
self.need_wakeup = enable;
self
}
pub fn shared_umem<F: AsFd>(mut self, primary: F) -> Self {
self.shared_umem_fd = primary.as_fd().as_raw_fd() as u32;
self
}
pub fn validate(&self) -> Result<&str, Error> {
let iface = self
.interface
.as_deref()
.ok_or_else(|| Error::Config("interface is required".into()))?;
if self.frame_size == 0 {
return Err(Error::Config("frame_size must be > 0".into()));
}
if self.frame_count == 0 {
return Err(Error::Config("frame_count must be > 0".into()));
}
Ok(iface)
}
#[cfg(feature = "af-xdp")]
pub fn build(self) -> Result<XdpSocket, Error> {
let iface = self.validate()?;
let ifindex = crate::afpacket::socket::resolve_interface(iface)? as u32;
let mut umem = Umem::new(self.frame_size, self.frame_count)?;
let fd = socket::create_xdp_socket()?;
if self.shared_umem_fd == 0 {
socket::register_umem(fd.as_fd(), &umem.as_reg())?;
}
let ring_size = (self.frame_count as u32).next_power_of_two();
socket::set_ring_size(
fd.as_fd(),
ffi::XDP_UMEM_FILL_RING,
ring_size,
"XDP_UMEM_FILL_RING",
)?;
socket::set_ring_size(
fd.as_fd(),
ffi::XDP_UMEM_COMPLETION_RING,
ring_size,
"XDP_UMEM_COMPLETION_RING",
)?;
socket::set_ring_size(fd.as_fd(), ffi::XDP_RX_RING, ring_size, "XDP_RX_RING")?;
socket::set_ring_size(fd.as_fd(), ffi::XDP_TX_RING, ring_size, "XDP_TX_RING")?;
let offsets = socket::get_mmap_offsets(fd.as_fd())?;
let mut fill = unsafe {
FillRing::mmap(
fd.as_fd(),
ring_size,
&offsets.fr,
ffi::XDP_UMEM_PGOFF_FILL_RING as libc::off_t,
)?
};
let comp = unsafe {
CompletionRing::mmap(
fd.as_fd(),
ring_size,
&offsets.cr,
ffi::XDP_UMEM_PGOFF_COMPLETION_RING as libc::off_t,
)?
};
let rx = unsafe {
RxRing::mmap(
fd.as_fd(),
ring_size,
&offsets.rx,
ffi::XDP_PGOFF_RX_RING as libc::off_t,
)?
};
let tx = unsafe {
TxRing::mmap(
fd.as_fd(),
ring_size,
&offsets.tx,
ffi::XDP_PGOFF_TX_RING as libc::off_t,
)?
};
let cap_avail = umem.available().min(ring_size as usize);
let prefill = match self.mode {
XdpMode::Rx => cap_avail,
XdpMode::Tx => 0,
XdpMode::RxTx => cap_avail / 2,
XdpMode::Custom { prefill } => (prefill as usize).min(cap_avail),
} as u32;
if prefill > 0 {
if let Some(tok) = fill.producer_reserve(prefill) {
let mut written = 0u32;
for i in 0..prefill {
match umem.alloc_frame() {
Some(addr) => {
fill.write_at(tok, i, addr);
written += 1;
}
None => break,
}
}
debug_assert_eq!(written, tok.n);
if written > 0 {
fill.producer_submit(tok);
}
}
}
let mut bind_flags: u16 = 0;
if self.need_wakeup {
bind_flags |= ffi::XDP_USE_NEED_WAKEUP;
}
if self.shared_umem_fd != 0 {
bind_flags |= ffi::XDP_SHARED_UMEM;
}
socket::bind_xdp(
fd.as_fd(),
ifindex,
self.queue_id,
bind_flags,
self.shared_umem_fd,
)?;
Ok(XdpSocket {
fd,
umem,
fill,
rx,
tx,
comp,
need_wakeup_enabled: self.need_wakeup,
})
}
#[cfg(not(feature = "af-xdp"))]
pub fn build(self) -> Result<XdpSocket, Error> {
Err(Error::Config(
"AF_XDP requires the 'af-xdp' feature flag".into(),
))
}
}
pub struct XdpSocket {
#[cfg(feature = "af-xdp")]
fd: OwnedFd,
#[cfg(feature = "af-xdp")]
umem: Umem,
#[cfg(feature = "af-xdp")]
fill: FillRing,
#[cfg(feature = "af-xdp")]
rx: RxRing,
#[cfg(feature = "af-xdp")]
tx: TxRing,
#[cfg(feature = "af-xdp")]
comp: CompletionRing,
#[cfg(feature = "af-xdp")]
need_wakeup_enabled: bool,
#[cfg(not(feature = "af-xdp"))]
_private: std::marker::PhantomData<*const ()>,
}
impl XdpSocket {
#[cfg(feature = "af-xdp")]
pub fn open(interface: &str) -> Result<Self, Error> {
XdpSocketBuilder::default().interface(interface).build()
}
#[cfg(feature = "af-xdp")]
pub fn builder() -> XdpSocketBuilder {
XdpSocketBuilder::default()
}
#[cfg(feature = "af-xdp")]
pub fn next_batch(&mut self) -> Option<XdpBatch<'_>> {
self.recycle_completed();
self.rx
.consumer_peek(64)
.map(|tok| XdpBatch::new(self, tok))
}
#[cfg(feature = "af-xdp")]
pub fn next_batch_blocking(
&mut self,
timeout: Duration,
) -> Result<Option<XdpBatch<'_>>, Error> {
if !self.rx_is_empty() {
return Ok(self.next_batch());
}
let mut pfds = [nix::poll::PollFd::new(
self.fd.as_fd(),
nix::poll::PollFlags::POLLIN,
)];
crate::syscall::poll_eintr_safe(&mut pfds, timeout).map_err(Error::Io)?;
Ok(self.next_batch())
}
#[cfg(feature = "af-xdp")]
fn rx_is_empty(&self) -> bool {
self.rx.cached_count() == 0
}
#[cfg(feature = "af-xdp")]
pub fn recv(&mut self) -> Result<Vec<OwnedPacket>, Error> {
self.recycle_completed();
let tok = match self.rx.consumer_peek(64) {
Some(t) => t,
None => return Ok(Vec::new()),
};
let mut packets = Vec::with_capacity(tok.n as usize);
for i in 0..tok.n {
let desc: libc::xdp_desc = self.rx.read_at(tok, i);
match self.umem.data_checked(desc.addr, desc.len as usize) {
Some(data) => packets.push(OwnedPacket {
data: data.to_vec(),
timestamp: Timestamp::default(),
original_len: desc.len as usize,
status: crate::packet::PacketStatus::default(),
direction: crate::packet::PacketDirection::Unknown(0),
rxhash: 0,
vlan_tci: 0,
vlan_tpid: 0,
ll_protocol: 0,
source_ll_addr: [0u8; 8],
source_ll_addr_len: 0,
}),
None => {
tracing::warn!(
addr = desc.addr,
len = desc.len,
"AF_XDP: malformed RX descriptor; skipping"
);
}
}
self.umem.free_frame(desc.addr);
}
self.rx.consumer_release(tok);
self.refill();
Ok(packets)
}
#[cfg(feature = "af-xdp")]
pub fn send(&mut self, data: &[u8]) -> Result<bool, Error> {
if data.len() > self.umem.frame_size() {
return Err(Error::Config(format!(
"packet {} bytes exceeds frame size {}",
data.len(),
self.umem.frame_size()
)));
}
self.recycle_completed();
let addr = match self.umem.alloc_frame() {
Some(a) => a,
None => return Ok(false),
};
let buf = self
.umem
.data_mut_checked(addr, data.len())
.expect("send: frame_size pre-check guarantees fit");
buf.copy_from_slice(data);
let tok = match self.tx.producer_reserve(1) {
Some(t) => t,
None => {
self.umem.free_frame(addr);
return Ok(false);
}
};
self.tx.write_at(
tok,
0,
libc::xdp_desc {
addr,
len: data.len() as u32,
options: 0,
},
);
self.tx.producer_submit(tok);
Ok(true)
}
#[cfg(feature = "af-xdp")]
pub fn flush(&self) -> Result<(), Error> {
if self.need_wakeup_enabled && !self.tx.needs_wakeup() {
return Ok(());
}
crate::syscall::sendto_kick_eintr_safe(self.fd.as_raw_fd(), libc::MSG_DONTWAIT)
.map_err(Error::Io)
}
#[cfg(feature = "af-xdp")]
pub fn poll(&self, timeout: Duration) -> Result<bool, Error> {
let fd = self.fd.as_fd();
let mut pfds = [nix::poll::PollFd::new(fd, nix::poll::PollFlags::POLLIN)];
let n = crate::syscall::poll_eintr_safe(&mut pfds, timeout).map_err(Error::Io)?;
Ok(n > 0)
}
#[cfg(feature = "af-xdp")]
pub fn statistics(&self) -> Result<XdpStats, Error> {
socket::get_statistics(self.fd.as_fd()).map(XdpStats::from)
}
#[cfg(feature = "af-xdp")]
fn recycle_completed(&mut self) {
let tok = match self.comp.consumer_peek(64) {
Some(t) => t,
None => return,
};
let mut addrs = [0u64; 64];
for i in 0..tok.n {
addrs[i as usize] = self.comp.read_at(tok, i);
}
self.umem.free_frames(&addrs[..tok.n as usize]);
self.comp.consumer_release(tok);
}
#[cfg(feature = "af-xdp")]
fn refill(&mut self) {
let want = self.umem.available().min(64) as u32;
if want == 0 {
return;
}
if let Some(tok) = self.fill.producer_reserve(want) {
let mut filled = 0u32;
for i in 0..tok.n {
if let Some(addr) = self.umem.alloc_frame() {
self.fill.write_at(tok, i, addr);
filled += 1;
}
}
debug_assert_eq!(filled, tok.n);
if filled > 0 {
self.fill.producer_submit(tok);
}
}
}
}
#[cfg(feature = "af-xdp")]
impl AsFd for XdpSocket {
fn as_fd(&self) -> BorrowedFd<'_> {
self.fd.as_fd()
}
}
#[cfg(feature = "af-xdp")]
impl AsRawFd for XdpSocket {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.fd.as_raw_fd()
}
}
#[cfg(feature = "af-xdp")]
unsafe impl Send for XdpSocket {}
impl std::fmt::Debug for XdpSocket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("XdpSocket");
#[cfg(feature = "af-xdp")]
{
d.field("frame_size", &self.umem.frame_size());
d.field("umem_available", &self.umem.available());
}
d.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_rejects_missing_interface() {
let err = XdpSocketBuilder::default().build().unwrap_err();
assert!(matches!(err, Error::Config(_)));
}
#[test]
fn builder_defaults() {
let b = XdpSocketBuilder::default();
assert!(b.validate().is_err()); let b = b.interface("lo");
assert!(b.validate().is_ok());
}
#[test]
fn builder_validate_ok() {
let b = XdpSocketBuilder::default().interface("lo");
assert!(b.validate().is_ok());
}
#[test]
fn builder_validate_zero_frame_size() {
let b = XdpSocketBuilder::default().interface("lo").frame_size(0);
assert!(b.validate().is_err());
}
#[test]
fn builder_validate_zero_frame_count() {
let b = XdpSocketBuilder::default().interface("lo").frame_count(0);
assert!(b.validate().is_err());
}
#[test]
fn builder_chaining() {
let b = XdpSocketBuilder::default()
.interface("eth0")
.queue_id(3)
.frame_size(2048)
.frame_count(1024)
.need_wakeup(false);
assert_eq!(b.validate().unwrap(), "eth0");
}
#[test]
fn builder_default_mode_is_rxtx() {
let b = XdpSocketBuilder::default();
assert_eq!(b.mode, XdpMode::RxTx);
}
#[test]
fn builder_mode_setter() {
let b = XdpSocketBuilder::default().mode(XdpMode::Tx);
assert_eq!(b.mode, XdpMode::Tx);
let b = XdpSocketBuilder::default().mode(XdpMode::Custom { prefill: 256 });
assert_eq!(b.mode, XdpMode::Custom { prefill: 256 });
}
fn compute_prefill(mode: XdpMode, available: usize, ring_size: usize) -> u32 {
let cap_avail = available.min(ring_size);
let n = match mode {
XdpMode::Rx => cap_avail,
XdpMode::Tx => 0,
XdpMode::RxTx => cap_avail / 2,
XdpMode::Custom { prefill } => (prefill as usize).min(cap_avail),
};
n as u32
}
#[test]
fn prefill_tx_keeps_all_frames_in_free_list() {
assert_eq!(compute_prefill(XdpMode::Tx, 4096, 4096), 0);
}
#[test]
fn prefill_rx_consumes_all_frames() {
assert_eq!(compute_prefill(XdpMode::Rx, 4096, 4096), 4096);
}
#[test]
fn prefill_rxtx_splits_in_half() {
assert_eq!(compute_prefill(XdpMode::RxTx, 4096, 4096), 2048);
}
#[test]
fn prefill_custom_clamped() {
assert_eq!(
compute_prefill(XdpMode::Custom { prefill: 100 }, 4096, 4096),
100
);
assert_eq!(
compute_prefill(XdpMode::Custom { prefill: 8192 }, 4096, 4096),
4096
);
assert_eq!(
compute_prefill(XdpMode::Custom { prefill: 1000 }, 64, 4096),
64
);
}
}