use smoltcp::phy::{self, Checksum, ChecksumCapabilities, Device, DeviceCapabilities, Medium};
use smoltcp::time::Instant;
use crate::ffi;
use crate::port::ChecksumOffloads;
const BURST_SIZE: usize = 64;
const DEFAULT_MTU: usize = 1500;
struct RxPort {
port_id: u16,
queue_id: u16,
rx_buf: [*mut ffi::rte_mbuf; BURST_SIZE],
rx_count: usize,
rx_cursor: usize,
}
pub struct DpdkDevice {
rx_ports: Vec<RxPort>,
active_rx: usize,
tx_port_id: u16,
tx_queue_id: u16,
mempool: *mut ffi::rte_mempool,
mtu: usize,
offloads: ChecksumOffloads,
tx_ol_flags: u64,
tx_vlan_id: u16,
inject_queue: Vec<Vec<u8>>,
learned_neighbors: Vec<([u8; 6], [u8; 4])>,
known_neighbors: rustc_hash::FxHashMap<[u8; 4], std::time::Instant>,
batch_mbufs: Vec<*mut ffi::rte_mbuf>,
batch_injected: Vec<Vec<u8>>,
tx_batch: Vec<*mut ffi::rte_mbuf>,
}
unsafe impl Send for DpdkDevice {}
impl DpdkDevice {
pub fn new(
port_ids: &[u16],
mempool: *mut ffi::rte_mempool,
offloads: ChecksumOffloads,
queue_id: u16,
) -> Self {
assert!(!port_ids.is_empty(), "at least one DPDK port required");
let mut tx_ol_flags: u64 = 0;
if offloads.tx_ip {
tx_ol_flags |= unsafe { ffi::dpdk_tx_offload_ipv4_cksum() };
}
if offloads.tx_tcp {
tx_ol_flags |= unsafe { ffi::dpdk_tx_offload_tcp_cksum() };
}
if tx_ol_flags != 0 {
tracing::info!("DPDK TX checksum offload enabled (flags=0x{tx_ol_flags:x})");
}
let rx_ports = port_ids
.iter()
.map(|&port_id| RxPort {
port_id,
queue_id,
rx_buf: [std::ptr::null_mut(); BURST_SIZE],
rx_count: 0,
rx_cursor: 0,
})
.collect();
DpdkDevice {
rx_ports,
active_rx: 0,
tx_port_id: port_ids[0],
tx_queue_id: queue_id,
mempool,
mtu: DEFAULT_MTU,
offloads,
tx_ol_flags,
tx_vlan_id: 0,
inject_queue: Vec::new(),
learned_neighbors: Vec::new(),
known_neighbors: rustc_hash::FxHashMap::with_capacity_and_hasher(
64,
Default::default(),
),
batch_mbufs: Vec::with_capacity(BURST_SIZE * port_ids.len()),
batch_injected: Vec::new(),
tx_batch: Vec::with_capacity(BURST_SIZE),
}
}
pub fn flush_tx(&mut self) {
if self.tx_batch.is_empty() {
return;
}
let count = self.tx_batch.len();
let sent = unsafe {
ffi::dpdk_eth_tx_burst(
self.tx_port_id,
self.tx_queue_id,
self.tx_batch.as_mut_ptr(),
count as u16,
)
} as usize;
if sent == count {
self.tx_batch.clear();
} else {
self.tx_batch.drain(..sent);
}
}
pub fn poll_rx(&mut self) {
let active = &self.rx_ports[self.active_rx];
if active.rx_cursor < active.rx_count {
return;
}
let n = self.rx_ports.len();
for i in 0..n {
let idx = (self.active_rx + i) % n;
let port = &mut self.rx_ports[idx];
let count = unsafe {
ffi::dpdk_eth_rx_burst(
port.port_id,
port.queue_id,
port.rx_buf.as_mut_ptr(),
BURST_SIZE as u16,
)
};
if count > 0 {
port.rx_count = count as usize;
port.rx_cursor = 0;
self.active_rx = idx;
return;
}
}
}
pub fn set_mtu(&mut self, mtu: usize) {
self.mtu = mtu;
}
pub fn collect_rx_batch(&mut self) -> RxBatch {
let mut mbufs = std::mem::take(&mut self.batch_mbufs);
mbufs.clear();
let mut now: Option<std::time::Instant> = None;
for port in &mut self.rx_ports {
let count = unsafe {
ffi::dpdk_eth_rx_burst(
port.port_id,
port.queue_id,
port.rx_buf.as_mut_ptr(),
BURST_SIZE as u16,
)
};
for i in 0..count as usize {
let mbuf = port.rx_buf[i];
let (data_ptr, data_len) = unsafe {
let buf_addr = ffi::dpdk_mbuf_buf_addr(mbuf).cast::<u8>();
let data_off = ffi::dpdk_mbuf_data_off(mbuf) as usize;
(
buf_addr.add(data_off),
ffi::dpdk_mbuf_data_len(mbuf) as usize,
)
};
if data_len >= 34 {
let data = unsafe { std::slice::from_raw_parts(data_ptr, data_len) };
if data[12] == 0x08 && data[13] == 0x00 {
let mut src_mac = [0u8; 6];
src_mac.copy_from_slice(&data[6..12]);
let mut src_ip = [0u8; 4];
src_ip.copy_from_slice(&data[26..30]);
const RESEED_SECS: u64 = 30;
let now = *now.get_or_insert_with(std::time::Instant::now);
let needs_seed = match self.known_neighbors.get_mut(&src_ip) {
Some(last) => {
if now.duration_since(*last).as_secs() >= RESEED_SECS {
*last = now;
true
} else {
false
}
}
None => {
self.known_neighbors.insert(src_ip, now);
true
}
};
if needs_seed {
self.learned_neighbors.push((src_mac, src_ip));
}
}
}
mbufs.push(mbuf);
}
port.rx_count = 0;
port.rx_cursor = 0;
}
let injected = std::mem::take(&mut self.inject_queue);
RxBatch { mbufs, injected }
}
pub fn set_vlan_id(&mut self, vlan_id: u16) {
self.tx_vlan_id = vlan_id;
self.tx_ol_flags |= unsafe { ffi::dpdk_tx_vlan_flag() };
tracing::info!(vlan_id, "DPDK TX VLAN insert enabled");
}
pub fn send_raw_frame(&mut self, frame: &[u8]) {
let mbuf = unsafe { ffi::dpdk_pktmbuf_alloc(self.mempool) };
assert!(!mbuf.is_null(), "mbuf alloc failed for raw frame TX");
unsafe {
let buf_addr = ffi::dpdk_mbuf_buf_addr(mbuf).cast::<u8>();
let data_off = ffi::dpdk_mbuf_data_off(mbuf) as usize;
let data_ptr = buf_addr.add(data_off);
std::ptr::copy_nonoverlapping(frame.as_ptr(), data_ptr, frame.len());
ffi::dpdk_mbuf_set_data_len(mbuf, frame.len() as u16);
ffi::dpdk_mbuf_set_pkt_len(mbuf, frame.len() as u32);
if self.tx_vlan_id != 0 {
ffi::dpdk_mbuf_set_ol_flags(mbuf, ffi::dpdk_tx_vlan_flag());
ffi::dpdk_mbuf_set_vlan_tci(mbuf, self.tx_vlan_id);
}
}
self.tx_batch.push(mbuf);
self.flush_tx();
}
pub fn inject_rx(&mut self, frame: Vec<u8>) {
self.inject_queue.push(frame);
}
pub fn take_learned_neighbors(&mut self) -> Vec<([u8; 6], [u8; 4])> {
std::mem::take(&mut self.learned_neighbors)
}
pub fn capabilities(&self) -> DeviceCapabilities {
let mut caps = DeviceCapabilities::default();
caps.medium = Medium::Ethernet;
caps.max_transmission_unit = self.mtu;
caps.max_burst_size = Some(BURST_SIZE);
let mut checksums = ChecksumCapabilities::default();
if self.offloads.rx_ip && self.offloads.tx_ip {
checksums.ipv4 = Checksum::None;
} else if self.offloads.tx_ip {
checksums.ipv4 = Checksum::Rx; } else if self.offloads.rx_ip {
checksums.ipv4 = Checksum::Tx; }
if self.offloads.rx_tcp && self.offloads.tx_tcp {
checksums.tcp = Checksum::None;
} else if self.offloads.tx_tcp {
checksums.tcp = Checksum::Rx;
} else if self.offloads.rx_tcp {
checksums.tcp = Checksum::Tx;
}
caps.checksum = checksums;
caps
}
}
impl Device for DpdkDevice {
type RxToken<'a> = DpdkRxToken;
type TxToken<'a> = DpdkTxToken<'a>;
fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
if let Some(frame) = self.inject_queue.pop() {
let rx_token = DpdkRxToken::Injected(frame);
let tx_token = DpdkTxToken {
mempool: self.mempool,
tx_ol_flags: self.tx_ol_flags,
tx_vlan_id: self.tx_vlan_id,
tx_batch: &mut self.tx_batch,
};
return Some((rx_token, tx_token));
}
let active = &mut self.rx_ports[self.active_rx];
if active.rx_cursor >= active.rx_count {
return None;
}
let mbuf = active.rx_buf[active.rx_cursor];
active.rx_cursor += 1;
let (data_ptr, data_len) = unsafe {
let buf_addr = ffi::dpdk_mbuf_buf_addr(mbuf).cast::<u8>();
let data_off = ffi::dpdk_mbuf_data_off(mbuf) as usize;
let ptr = buf_addr.add(data_off);
let len = ffi::dpdk_mbuf_data_len(mbuf) as usize;
(ptr, len)
};
let rx_token = DpdkRxToken::Mbuf {
mbuf,
data_ptr: data_ptr as *const u8,
data_len,
};
let tx_token = DpdkTxToken {
mempool: self.mempool,
tx_ol_flags: self.tx_ol_flags,
tx_vlan_id: self.tx_vlan_id,
tx_batch: &mut self.tx_batch,
};
Some((rx_token, tx_token))
}
fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
Some(DpdkTxToken {
mempool: self.mempool,
tx_ol_flags: self.tx_ol_flags,
tx_vlan_id: self.tx_vlan_id,
tx_batch: &mut self.tx_batch,
})
}
fn capabilities(&self) -> DeviceCapabilities {
self.capabilities()
}
}
pub enum DpdkRxToken {
Mbuf {
mbuf: *mut ffi::rte_mbuf,
data_ptr: *const u8,
data_len: usize,
},
Injected(Vec<u8>),
}
impl phy::RxToken for DpdkRxToken {
fn consume<R, F>(self, f: F) -> R
where
F: FnOnce(&[u8]) -> R,
{
match self {
DpdkRxToken::Mbuf {
mbuf,
data_ptr,
data_len,
} => {
let data = unsafe { std::slice::from_raw_parts(data_ptr, data_len) };
let result = f(data);
unsafe {
ffi::dpdk_pktmbuf_free(mbuf);
}
result
}
DpdkRxToken::Injected(ref frame) => f(frame),
}
}
}
pub struct DpdkTxToken<'a> {
mempool: *mut ffi::rte_mempool,
tx_ol_flags: u64,
tx_vlan_id: u16,
tx_batch: &'a mut Vec<*mut ffi::rte_mbuf>,
}
impl<'a> phy::TxToken for DpdkTxToken<'a> {
fn consume<R, F>(self, len: usize, f: F) -> R
where
F: FnOnce(&mut [u8]) -> R,
{
let mbuf = unsafe { ffi::dpdk_pktmbuf_alloc(self.mempool) };
assert!(!mbuf.is_null(), "mbuf alloc failed — mempool exhausted");
let data_ptr = unsafe {
let buf_addr = ffi::dpdk_mbuf_buf_addr(mbuf).cast::<u8>();
let data_off = ffi::dpdk_mbuf_data_off(mbuf) as usize;
buf_addr.add(data_off)
};
let buf = unsafe { std::slice::from_raw_parts_mut(data_ptr, len) };
let result = f(buf);
unsafe {
ffi::dpdk_mbuf_set_data_len(mbuf, len as u16);
ffi::dpdk_mbuf_set_pkt_len(mbuf, len as u32);
if self.tx_ol_flags != 0
&& len >= 54
&& *data_ptr.add(12) == 0x08
&& *data_ptr.add(13) == 0x00 && *data_ptr.add(23) == 6
{
let phdr_cksum =
ipv4_pseudo_header_checksum(std::slice::from_raw_parts(data_ptr, len));
let cksum_bytes = phdr_cksum.to_ne_bytes();
*data_ptr.add(50) = cksum_bytes[0];
*data_ptr.add(51) = cksum_bytes[1];
ffi::dpdk_mbuf_set_ol_flags(mbuf, self.tx_ol_flags);
ffi::dpdk_mbuf_set_tx_offload(mbuf, 14, 20, 0);
}
if self.tx_vlan_id != 0 {
let current_flags = ffi::dpdk_mbuf_ol_flags(mbuf);
if current_flags & ffi::dpdk_tx_vlan_flag() == 0 {
ffi::dpdk_mbuf_set_ol_flags(mbuf, current_flags | ffi::dpdk_tx_vlan_flag());
}
ffi::dpdk_mbuf_set_vlan_tci(mbuf, self.tx_vlan_id);
}
}
self.tx_batch.push(mbuf);
result
}
}
#[inline(always)]
fn ipv4_pseudo_header_checksum(frame: &[u8]) -> u16 {
let tcp_len = (frame.len() - 34) as u16;
let sum: u32 = u16::from_ne_bytes([frame[26], frame[27]]) as u32
+ u16::from_ne_bytes([frame[28], frame[29]]) as u32
+ u16::from_ne_bytes([frame[30], frame[31]]) as u32
+ u16::from_ne_bytes([frame[32], frame[33]]) as u32
+ u16::from_ne_bytes([0, 6]) as u32 + u16::from_ne_bytes(tcp_len.to_be_bytes()) as u32;
let folded = (sum & 0xFFFF) + (sum >> 16);
((folded & 0xFFFF) + (folded >> 16)) as u16
}
pub struct RxBatch {
mbufs: Vec<*mut ffi::rte_mbuf>,
injected: Vec<Vec<u8>>,
}
unsafe impl Send for RxBatch {}
impl RxBatch {
pub fn len(&self) -> usize {
self.mbufs.len() + self.injected.len()
}
pub fn is_empty(&self) -> bool {
self.mbufs.is_empty() && self.injected.is_empty()
}
pub fn append_injected(&mut self, device: &mut DpdkDevice) {
self.injected.append(&mut device.inject_queue);
}
pub fn write_slices<'a>(&'a self, out: &mut [std::mem::MaybeUninit<&'a [u8]>]) -> usize {
let mut i = 0;
for frame in &self.injected {
out[i] = std::mem::MaybeUninit::new(frame.as_slice());
i += 1;
}
for &mbuf in &self.mbufs {
let data = unsafe {
let buf_addr = ffi::dpdk_mbuf_buf_addr(mbuf).cast::<u8>();
let data_off = ffi::dpdk_mbuf_data_off(mbuf) as usize;
let ptr = buf_addr.add(data_off);
let len = ffi::dpdk_mbuf_data_len(mbuf) as usize;
std::slice::from_raw_parts(ptr, len)
};
out[i] = std::mem::MaybeUninit::new(data);
i += 1;
}
i
}
pub fn write_slices_with_handles<'a, F>(
&'a self,
out: &mut [std::mem::MaybeUninit<(&'a [u8], smoltcp::socket::tcp::OpaqueFrameHandle)>],
mbuf_to_handle: F,
) -> usize
where
F: Fn(*mut ffi::rte_mbuf) -> smoltcp::socket::tcp::OpaqueFrameHandle,
{
let mut i = 0;
let zero_handle = smoltcp::socket::tcp::OpaqueFrameHandle::from_bytes([0; 16]);
for frame in &self.injected {
out[i] = std::mem::MaybeUninit::new((frame.as_slice(), zero_handle));
i += 1;
}
for &mbuf in &self.mbufs {
let data = unsafe {
let buf_addr = ffi::dpdk_mbuf_buf_addr(mbuf).cast::<u8>();
let data_off = ffi::dpdk_mbuf_data_off(mbuf) as usize;
let ptr = buf_addr.add(data_off);
let len = ffi::dpdk_mbuf_data_len(mbuf) as usize;
std::slice::from_raw_parts(ptr, len)
};
out[i] = std::mem::MaybeUninit::new((data, mbuf_to_handle(mbuf)));
i += 1;
}
i
}
pub fn recycle(mut self, device: &mut DpdkDevice) {
for &mbuf in &self.mbufs {
unsafe {
ffi::dpdk_pktmbuf_free(mbuf);
}
}
let mut mbufs = std::mem::take(&mut self.mbufs);
mbufs.clear();
device.batch_mbufs = mbufs;
let mut injected = std::mem::take(&mut self.injected);
injected.clear();
device.batch_injected = injected;
}
}
impl Drop for RxBatch {
fn drop(&mut self) {
for &mbuf in &self.mbufs {
unsafe {
ffi::dpdk_pktmbuf_free(mbuf);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn collect_slices(batch: &RxBatch) -> Vec<&[u8]> {
let n = batch.len();
let mut buf = vec![std::mem::MaybeUninit::uninit(); n];
let written = batch.write_slices(&mut buf);
buf[..written]
.iter()
.map(|s| unsafe { s.assume_init() })
.collect()
}
#[test]
fn rx_batch_empty() {
let batch = RxBatch {
mbufs: Vec::new(),
injected: Vec::new(),
};
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
assert!(collect_slices(&batch).is_empty());
}
#[test]
fn rx_batch_injected_only() {
let arp_frame = vec![0xFFu8; 42];
let tcp_frame = vec![0xAAu8; 60];
let batch = RxBatch {
mbufs: Vec::new(),
injected: vec![arp_frame.clone(), tcp_frame.clone()],
};
assert!(!batch.is_empty());
assert_eq!(batch.len(), 2);
let slices = collect_slices(&batch);
assert_eq!(slices.len(), 2);
assert_eq!(slices[0], &arp_frame[..]);
assert_eq!(slices[1], &tcp_frame[..]);
}
#[test]
fn rx_batch_injected_ordering() {
let batch = RxBatch {
mbufs: Vec::new(),
injected: vec![vec![1, 2, 3], vec![4, 5, 6]],
};
let slices = collect_slices(&batch);
assert_eq!(slices[0], &[1, 2, 3]);
assert_eq!(slices[1], &[4, 5, 6]);
}
}