use std::net::Ipv4Addr;
use std::sync::Arc;
use crate::ffi;
use smoltcp::iface::{Config, Interface, SocketHandle, SocketSet};
use smoltcp::socket::tcp::{self, OpaqueFrameHandle, State};
use smoltcp::time::Instant;
use smoltcp::wire::{EthernetAddress, HardwareAddress, IpAddress, IpCidr, Ipv4Address};
use crate::device::DpdkDevice;
use crate::eal::Eal;
use crate::mempool::Mempool;
use crate::port::{ChecksumOffloads, Port};
fn tune_socket(socket: &mut tcp::Socket<'_>) {
socket.set_nagle_enabled(false);
socket.set_ack_delay(None);
socket.set_min_rto(smoltcp::time::Duration::from_millis(1));
socket.set_initial_rto(smoltcp::time::Duration::from_millis(50));
socket.set_initial_congestion_window(64 * 1024);
}
pub const MAX_CONNECTIONS: usize = 1024;
const LISTEN_PORT: u16 = 9876;
fn retain_mbuf(handle: OpaqueFrameHandle) {
let mut ptr_bytes = [0u8; 8];
ptr_bytes.copy_from_slice(&handle.as_bytes()[..8]);
let mbuf = usize::from_ne_bytes(ptr_bytes) as *mut ffi::rte_mbuf;
if !mbuf.is_null() {
unsafe { ffi::dpdk_mbuf_refcnt_update(mbuf, 1) };
}
}
fn release_mbuf(handle: OpaqueFrameHandle) {
let mut ptr_bytes = [0u8; 8];
ptr_bytes.copy_from_slice(&handle.as_bytes()[..8]);
let mbuf = usize::from_ne_bytes(ptr_bytes) as *mut ffi::rte_mbuf;
if !mbuf.is_null() {
unsafe { ffi::dpdk_pktmbuf_free(mbuf) };
}
}
fn mbuf_to_handle(mbuf: *mut ffi::rte_mbuf) -> OpaqueFrameHandle {
let mut bytes = [0u8; 16];
bytes[..8].copy_from_slice(&(mbuf as usize).to_ne_bytes());
OpaqueFrameHandle::from_bytes(bytes)
}
const MAX_TX_QUEUE_SIZE: usize = 64 * 1024;
const SOCKET_RX_BUF_SIZE: usize = 64 * 1024;
const SOCKET_TX_BUF_SIZE: usize = 16 * 1024;
const TIMESTAMP_REFRESH_INTERVAL: u32 = 100;
#[derive(Clone)]
pub struct DpdkConfig {
pub eal_args: Vec<String>,
pub port_ids: Vec<u16>,
pub ip_addr: Ipv4Addr,
pub prefix_len: u8,
pub gateway: Option<Ipv4Addr>,
pub listen_port: u16,
pub mtu: usize,
pub vlan_id: Option<u16>,
pub num_queues: u16,
}
impl Default for DpdkConfig {
fn default() -> Self {
DpdkConfig {
eal_args: Vec::new(),
port_ids: vec![0],
ip_addr: Ipv4Addr::new(10, 0, 0, 1),
prefix_len: 24,
gateway: None,
listen_port: LISTEN_PORT,
mtu: 1500,
vlan_id: None,
num_queues: 1,
}
}
}
pub struct AcceptedConnection {
pub handle: SocketHandle,
pub peer: std::net::SocketAddr,
pub listen_port: u16,
}
struct ListenerEntry {
port: u16,
handle: SocketHandle,
rx_buf_size: usize,
tx_buf_size: usize,
tx_queue_limit: usize,
}
pub struct DpdkShared {
_ports: Vec<Port>,
_mempool: Mempool,
_eal: Eal,
pub offloads: ChecksumOffloads,
pub mac: [u8; 6],
pub mempool_raw: *mut crate::ffi::rte_mempool,
pub num_queues: u16,
}
unsafe impl Send for DpdkShared {}
unsafe impl Sync for DpdkShared {}
pub struct DpdkTransport {
_shared: Arc<DpdkShared>,
device: DpdkDevice,
iface: Interface,
sockets: SocketSet<'static>,
listeners: Vec<ListenerEntry>,
accepted: Vec<AcceptedConnection>,
tx_queues: Vec<Option<TxQueue>>,
tx_queue_limits: Vec<usize>,
cached_timestamp: Instant,
poll_count: u32,
pending_tx_bytes: usize,
}
struct TxQueue {
buf: Vec<u8>,
cursor: usize,
}
impl TxQueue {
fn new() -> Self {
TxQueue {
buf: Vec::new(),
cursor: 0,
}
}
fn pending(&self) -> &[u8] {
&self.buf[self.cursor..]
}
fn queued_bytes(&self) -> usize {
self.buf.len() - self.cursor
}
fn advance(&mut self, n: usize) {
self.cursor += n;
if self.cursor > self.buf.len() / 4 && self.cursor > 4096 {
self.buf.drain(..self.cursor);
self.cursor = 0;
}
}
fn push(&mut self, data: &[u8]) {
self.buf.extend_from_slice(data);
}
}
impl DpdkShared {
pub fn init(config: &DpdkConfig) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let eal_args: Vec<&str> = config.eal_args.iter().map(|s| s.as_str()).collect();
let eal = Eal::init(&eal_args)?;
let port_count = eal.port_count();
for &pid in &config.port_ids {
if pid >= port_count {
return Err(
format!("DPDK port {} not found (available: {})", pid, port_count).into(),
);
}
}
let num_mbufs: u32 =
8192 * (config.port_ids.len() as u32).max(1) * (config.num_queues as u32).max(1);
let mempool = if config.mtu > 1500 {
Mempool::create_for_mtu("pktmbuf_pool", num_mbufs, config.mtu as u16, 0)?
} else {
Mempool::create_with_size("pktmbuf_pool", num_mbufs, 0)?
};
let mut ports = Vec::with_capacity(config.port_ids.len());
let mut combined_offloads: Option<ChecksumOffloads> = None;
for &pid in &config.port_ids {
let mut port =
Port::configure_with_vlan(pid, &mempool, config.vlan_id, config.num_queues)?;
port.start()?;
combined_offloads = Some(match combined_offloads {
None => port.offloads,
Some(prev) => prev.intersect(port.offloads),
});
ports.push(port);
}
let offloads = combined_offloads.unwrap_or_default();
let mac = ports[0].mac_addr();
let mempool_raw = mempool.as_raw();
let actual_queues = ports[0].num_queues;
Ok(Arc::new(DpdkShared {
_ports: ports,
_mempool: mempool,
_eal: eal,
offloads,
mac,
mempool_raw,
num_queues: actual_queues,
}))
}
}
impl DpdkTransport {
pub fn from_shared(
shared: &Arc<DpdkShared>,
config: &DpdkConfig,
queue_id: u16,
) -> Result<Self, Box<dyn std::error::Error>> {
let mut device = DpdkDevice::new(
&config.port_ids,
shared.mempool_raw,
shared.offloads,
queue_id,
);
if config.mtu != 1500 {
device.set_mtu(config.mtu);
tracing::info!(mtu = config.mtu, queue_id, "DPDK jumbo frames enabled");
}
if let Some(vlan_id) = config.vlan_id {
device.set_vlan_id(vlan_id);
}
let hw_addr = HardwareAddress::Ethernet(EthernetAddress(shared.mac));
let iface_config = Config::new(hw_addr);
let now = Instant::from_millis(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock is before UNIX epoch")
.as_millis() as i64,
);
let mut iface = Interface::new(iface_config, &mut DpdkDeviceRef(&device), now);
let ip = Ipv4Address::new(
config.ip_addr.octets()[0],
config.ip_addr.octets()[1],
config.ip_addr.octets()[2],
config.ip_addr.octets()[3],
);
iface.update_ip_addrs(|addrs| {
addrs
.push(IpCidr::new(IpAddress::Ipv4(ip), config.prefix_len))
.expect("IP address capacity");
});
if let Some(gw) = config.gateway {
let gw_addr = Ipv4Address::new(
gw.octets()[0],
gw.octets()[1],
gw.octets()[2],
gw.octets()[3],
);
iface
.routes_mut()
.add_default_ipv4_route(gw_addr)
.expect("default route capacity");
}
let mut sockets = SocketSet::new(Vec::with_capacity(MAX_CONNECTIONS));
let listen_socket = {
let rx_buf = tcp::SocketBuffer::new(vec![0u8; SOCKET_RX_BUF_SIZE]);
let tx_buf = tcp::SocketBuffer::new(vec![0u8; SOCKET_TX_BUF_SIZE]);
let mut socket = tcp::Socket::new(rx_buf, tx_buf);
tune_socket(&mut socket);
socket
.listen(config.listen_port)
.map_err(|e| format!("TCP listen failed: {e}"))?;
socket
};
let listen_handle = sockets.add(listen_socket);
tracing::info!(
ip = %config.ip_addr,
port = config.listen_port,
mac = ?shared.mac,
queue_id,
"DPDK transport initialized"
);
Ok(DpdkTransport {
_shared: Arc::clone(shared),
device,
iface,
sockets,
listeners: vec![ListenerEntry {
port: config.listen_port,
handle: listen_handle,
rx_buf_size: SOCKET_RX_BUF_SIZE,
tx_buf_size: SOCKET_TX_BUF_SIZE,
tx_queue_limit: MAX_TX_QUEUE_SIZE,
}],
accepted: Vec::new(),
tx_queues: (0..MAX_CONNECTIONS).map(|_| None).collect(),
tx_queue_limits: vec![MAX_TX_QUEUE_SIZE; MAX_CONNECTIONS],
cached_timestamp: now,
poll_count: 0,
pending_tx_bytes: 0,
})
}
pub fn from_shared_with_port(
shared: &Arc<DpdkShared>,
config: &DpdkConfig,
queue_id: u16,
listen_port: u16,
) -> Result<Self, Box<dyn std::error::Error>> {
let mut overridden = config.clone();
overridden.listen_port = listen_port;
Self::from_shared(shared, &overridden, queue_id)
}
pub fn connect_to(
&mut self,
remote_ip: std::net::Ipv4Addr,
remote_port: u16,
local_port: u16,
) -> SocketHandle {
self.connect_to_with_buffers(
remote_ip,
remote_port,
local_port,
SOCKET_RX_BUF_SIZE,
SOCKET_TX_BUF_SIZE,
MAX_TX_QUEUE_SIZE,
)
}
pub fn connect_to_with_buffers(
&mut self,
remote_ip: std::net::Ipv4Addr,
remote_port: u16,
local_port: u16,
rx_buf_size: usize,
tx_buf_size: usize,
tx_queue_limit: usize,
) -> SocketHandle {
let rx_buf = tcp::SocketBuffer::new(vec![0u8; rx_buf_size]);
let tx_buf = tcp::SocketBuffer::new(vec![0u8; tx_buf_size]);
let mut socket = tcp::Socket::new(rx_buf, tx_buf);
tune_socket(&mut socket);
socket.set_zero_copy_retain_fn(retain_mbuf);
socket.set_zero_copy_release_fn(release_mbuf);
let remote_addr = Ipv4Address::new(
remote_ip.octets()[0],
remote_ip.octets()[1],
remote_ip.octets()[2],
remote_ip.octets()[3],
);
let local_ip = self.iface.ipv4_addr().expect("interface has IPv4 address");
socket
.connect(
self.iface.context(),
(IpAddress::Ipv4(remote_addr), remote_port),
(IpAddress::Ipv4(local_ip), local_port),
)
.expect("smoltcp connect failed");
let handle = self.sockets.add(socket);
if handle.index() < self.tx_queue_limits.len() {
self.tx_queue_limits[handle.index()] = tx_queue_limit;
}
handle
}
pub fn is_connected(&mut self, handle: SocketHandle) -> bool {
match self.sockets.try_get_mut::<tcp::Socket>(handle) {
Some(socket) => socket.may_send() && socket.may_recv(),
None => {
tracing::warn!(
handle = ?handle,
site = "is_connected",
"DPDK: stale SocketHandle observed"
);
false
}
}
}
pub fn init(config: &DpdkConfig) -> Result<Self, Box<dyn std::error::Error>> {
let shared = DpdkShared::init(config)?;
Self::from_shared(&shared, config, 0)
}
pub fn poll(&mut self) -> Instant {
self.poll_count = self.poll_count.wrapping_add(1);
if self.poll_count.is_multiple_of(TIMESTAMP_REFRESH_INTERVAL) {
self.cached_timestamp = Instant::from_millis(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock is before UNIX epoch")
.as_millis() as i64,
);
}
let mut batch = self.device.collect_rx_batch();
for (mac, ip_bytes) in self.device.take_learned_neighbors() {
let ip = Ipv4Addr::new(ip_bytes[0], ip_bytes[1], ip_bytes[2], ip_bytes[3]);
self.seed_neighbor(ip, mac);
}
batch.append_injected(&mut self.device);
if !batch.is_empty() {
const MAX_SLICES: usize = 128;
let mut zc_buf: [std::mem::MaybeUninit<(&[u8], OpaqueFrameHandle)>; MAX_SLICES] =
[std::mem::MaybeUninit::uninit(); MAX_SLICES];
let count = batch.write_slices_with_handles(&mut zc_buf, mbuf_to_handle);
let frames = unsafe {
std::slice::from_raw_parts(
zc_buf.as_ptr().cast::<(&[u8], OpaqueFrameHandle)>(),
count,
)
};
self.iface.poll_ingress_batch_zero_copy(
self.cached_timestamp,
&mut self.device,
&mut self.sockets,
frames,
);
self.device.flush_tx();
}
let rx_had_data = !batch.is_empty();
batch.recycle(&mut self.device);
let has_pending_tx = self.pending_tx_bytes > 0;
if rx_had_data
|| has_pending_tx
|| self.poll_count.is_multiple_of(TIMESTAMP_REFRESH_INTERVAL)
{
self.flush_tx_queues();
self.iface
.poll(self.cached_timestamp, &mut self.device, &mut self.sockets);
self.device.flush_tx();
self.check_listener();
}
self.cached_timestamp
}
fn check_listener(&mut self) {
let mut i = 0;
while i < self.listeners.len() {
let port = self.listeners[i].port;
let handle = self.listeners[i].handle;
let (state, peer) = match self.sockets.try_get_mut::<tcp::Socket>(handle) {
Some(socket) => {
if socket.state() != State::Established {
i += 1;
continue;
}
let peer = match socket.remote_endpoint() {
Some(remote) => match remote.addr {
IpAddress::Ipv4(ip) => {
let octets = ip.octets();
std::net::SocketAddr::new(
std::net::IpAddr::V4(Ipv4Addr::new(
octets[0], octets[1], octets[2], octets[3],
)),
remote.port,
)
}
},
None => {
i += 1;
continue;
}
};
(socket.state(), peer)
}
None => {
tracing::warn!(
handle = ?handle,
port,
site = "check_listener",
"DPDK: stale listener handle, removing from listeners[]"
);
self.listeners.swap_remove(i);
continue;
}
};
let _ = state;
let Some(accepted_socket) = self.sockets.try_get_mut::<tcp::Socket>(handle) else {
tracing::warn!(
handle = ?handle,
port,
site = "check_listener.accepted",
"DPDK: accepted handle vanished mid-iteration"
);
self.listeners.swap_remove(i);
continue;
};
accepted_socket.set_zero_copy_retain_fn(retain_mbuf);
accepted_socket.set_zero_copy_release_fn(release_mbuf);
let entry = &self.listeners[i];
let rx_buf_size = entry.rx_buf_size;
let tx_buf_size = entry.tx_buf_size;
let tx_queue_limit = entry.tx_queue_limit;
if handle.index() < self.tx_queue_limits.len() {
self.tx_queue_limits[handle.index()] = tx_queue_limit;
}
let new_listener = {
let rx_buf = tcp::SocketBuffer::new(vec![0u8; rx_buf_size]);
let tx_buf = tcp::SocketBuffer::new(vec![0u8; tx_buf_size]);
let mut socket = tcp::Socket::new(rx_buf, tx_buf);
tune_socket(&mut socket);
socket.listen(port).expect("re-listen after accept");
socket
};
let new_handle = self.sockets.add(new_listener);
self.listeners[i].handle = new_handle;
self.accepted.push(AcceptedConnection {
handle,
peer,
listen_port: port,
});
tracing::debug!(peer = %peer, listen_port = port, "DPDK: TCP connection accepted");
}
}
pub fn add_listener(&mut self, port: u16) -> Result<(), Box<dyn std::error::Error>> {
self.add_listener_with_buffers(
port,
SOCKET_RX_BUF_SIZE,
SOCKET_TX_BUF_SIZE,
MAX_TX_QUEUE_SIZE,
)
}
pub fn add_listener_with_buffers(
&mut self,
port: u16,
rx_buf_size: usize,
tx_buf_size: usize,
tx_queue_limit: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let rx_buf = tcp::SocketBuffer::new(vec![0u8; rx_buf_size]);
let tx_buf = tcp::SocketBuffer::new(vec![0u8; tx_buf_size]);
let mut socket = tcp::Socket::new(rx_buf, tx_buf);
tune_socket(&mut socket);
socket
.listen(port)
.map_err(|e| format!("TCP listen on port {port} failed: {e}"))?;
let handle = self.sockets.add(socket);
self.listeners.push(ListenerEntry {
port,
handle,
rx_buf_size,
tx_buf_size,
tx_queue_limit,
});
tracing::info!(port, "DPDK transport: added listener");
Ok(())
}
fn flush_tx_queues(&mut self) {
let Self {
tx_queues,
sockets,
pending_tx_bytes,
..
} = self;
for (handle, socket) in sockets.iter_mut() {
let Some(queue) = tx_queues.get_mut(handle.index()).and_then(|s| s.as_mut()) else {
continue;
};
if queue.queued_bytes() == 0 {
continue;
}
let smoltcp::socket::Socket::Tcp(socket) = socket;
if !socket.can_send() {
continue;
}
let sent = socket.send_slice(queue.pending()).unwrap_or(0);
if sent > 0 {
queue.advance(sent);
*pending_tx_bytes -= sent;
}
}
}
pub fn take_accepted(&mut self) -> Vec<AcceptedConnection> {
std::mem::take(&mut self.accepted)
}
pub fn recv(&mut self, handle: SocketHandle, buf: &mut [u8]) -> usize {
let Some(socket) = self.sockets.try_get_mut::<tcp::Socket>(handle) else {
tracing::warn!(handle = ?handle, site = "recv", "DPDK: stale SocketHandle observed");
return 0;
};
if !socket.can_recv() {
return 0;
}
socket.recv_slice(buf).unwrap_or(0)
}
pub fn recv_into_vec(&mut self, handle: SocketHandle, dest: &mut Vec<u8>) -> usize {
let Some(socket) = self.sockets.try_get_mut::<tcp::Socket>(handle) else {
tracing::warn!(handle = ?handle, site = "recv_into_vec", "DPDK: stale SocketHandle observed");
return 0;
};
if !socket.can_recv() {
return 0;
}
socket
.recv_zero_copy(|data| {
if dest.try_reserve(data.len()).is_err() {
return 0;
}
dest.extend_from_slice(data);
data.len()
})
.unwrap_or(0)
}
pub fn queue_send(&mut self, handle: SocketHandle, data: &[u8]) -> bool {
let limit = self.tx_queue_limits[handle.index()];
let slot = &mut self.tx_queues[handle.index()];
let queue = slot.get_or_insert_with(TxQueue::new);
if queue.queued_bytes() + data.len() > limit {
return false;
}
queue.push(data);
self.pending_tx_bytes += data.len();
true
}
pub fn tx_queue_bytes(&self, handle: SocketHandle) -> usize {
self.tx_queues[handle.index()]
.as_ref()
.map_or(0, |q| q.queued_bytes())
}
pub fn max_tx_queue_size(&self, handle: SocketHandle) -> usize {
self.tx_queue_limits[handle.index()]
}
pub fn is_active(&mut self, handle: SocketHandle) -> bool {
match self.sockets.try_get_mut::<tcp::Socket>(handle) {
Some(socket) => socket.is_active(),
None => {
tracing::warn!(
handle = ?handle,
site = "is_active",
"DPDK: stale SocketHandle observed"
);
false
}
}
}
pub fn close(&mut self, handle: SocketHandle) {
if let Some(socket) = self.sockets.try_get_mut::<tcp::Socket>(handle) {
socket.abort();
self.sockets.remove(handle);
} else {
tracing::warn!(
handle = ?handle,
site = "close",
"DPDK: close called on already-removed handle"
);
}
if let Some(q) = self.tx_queues[handle.index()].take() {
self.pending_tx_bytes -= q.queued_bytes();
}
}
pub fn send_gratuitous_arp(&mut self) {
let our_mac = self._shared.mac;
let our_ip = self.iface.ipv4_addr().expect("interface has IPv4 address");
let mut frame = [0u8; 42];
frame[0..6].copy_from_slice(&[0xff, 0xff, 0xff, 0xff, 0xff, 0xff]);
frame[6..12].copy_from_slice(&our_mac);
frame[12..14].copy_from_slice(&[0x08, 0x06]);
frame[14..16].copy_from_slice(&[0x00, 0x01]); frame[16..18].copy_from_slice(&[0x08, 0x00]); frame[18] = 6; frame[19] = 4; frame[20..22].copy_from_slice(&[0x00, 0x01]); frame[22..28].copy_from_slice(&our_mac); frame[28..32].copy_from_slice(&our_ip.octets()); frame[32..38].copy_from_slice(&[0x00; 6]); frame[38..42].copy_from_slice(&our_ip.octets());
self.device.send_raw_frame(&frame);
tracing::info!(
mac = ?our_mac,
ip = %our_ip,
"sent gratuitous ARP (switch MAC learning)"
);
}
pub fn seed_neighbor(&mut self, ip: Ipv4Addr, mac: [u8; 6]) {
let our_mac = self._shared.mac;
let our_ip = self.iface.ipv4_addr().expect("interface has IPv4 address");
let mut frame = [0u8; 42];
frame[0..6].copy_from_slice(&our_mac); frame[6..12].copy_from_slice(&mac); frame[12..14].copy_from_slice(&[0x08, 0x06]);
frame[14..16].copy_from_slice(&[0x00, 0x01]); frame[16..18].copy_from_slice(&[0x08, 0x00]); frame[18] = 6; frame[19] = 4; frame[20..22].copy_from_slice(&[0x00, 0x02]); frame[22..28].copy_from_slice(&mac); frame[28..32].copy_from_slice(&ip.octets()); frame[32..38].copy_from_slice(&our_mac); frame[38..42].copy_from_slice(&our_ip.octets());
self.device.inject_rx(frame.to_vec());
tracing::debug!(
peer_ip = %ip,
peer_mac = ?mac,
"seeded neighbor cache with ARP reply"
);
}
}
struct DpdkDeviceRef<'a>(&'a DpdkDevice);
impl<'a> smoltcp::phy::Device for DpdkDeviceRef<'a> {
type RxToken<'b>
= crate::device::DpdkRxToken
where
Self: 'b;
type TxToken<'b>
= crate::device::DpdkTxToken<'b>
where
Self: 'b;
fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
None
}
fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
None
}
fn capabilities(&self) -> smoltcp::phy::DeviceCapabilities {
self.0.capabilities()
}
}