use super::{CoreId, Kni, KniBuilder, KniTxQueue, Mbuf, Mempool, MempoolMap, SocketId};
use crate::dpdk::DpdkError;
use crate::ffi::{self, AsStr, ToCString, ToResult};
#[cfg(feature = "metrics")]
use crate::metrics::{labels, Counter, SINK};
use crate::net::MacAddr;
#[cfg(feature = "pcap-dump")]
use crate::pcap;
use crate::{debug, ensure, info, warn};
use anyhow::Result;
use std::collections::HashMap;
use std::fmt;
use std::os::raw;
use std::ptr;
use thiserror::Error;
const DEFAULT_RSS_HF: u64 =
(ffi::ETH_RSS_IP | ffi::ETH_RSS_TCP | ffi::ETH_RSS_UDP | ffi::ETH_RSS_SCTP) as u64;
#[derive(Copy, Clone)]
pub(crate) struct PortId(u16);
impl PortId {
#[inline]
pub(crate) fn socket_id(self) -> Option<SocketId> {
let id = unsafe { SocketId(ffi::rte_eth_dev_socket_id(self.0)) };
if SocketId::all().contains(&id) {
Some(id)
} else {
None
}
}
#[allow(clippy::trivially_copy_pass_by_ref)]
#[inline]
pub(crate) fn raw(&self) -> u16 {
self.0
}
}
impl fmt::Debug for PortId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "port{}", self.0)
}
}
#[derive(Copy, Clone)]
pub(crate) struct RxQueueIndex(u16);
impl RxQueueIndex {
#[allow(clippy::trivially_copy_pass_by_ref, dead_code)]
#[inline]
pub(crate) fn raw(&self) -> u16 {
self.0
}
}
#[derive(Copy, Clone)]
pub(crate) struct TxQueueIndex(u16);
impl TxQueueIndex {
#[allow(clippy::trivially_copy_pass_by_ref, dead_code)]
#[inline]
pub(crate) fn raw(&self) -> u16 {
self.0
}
}
#[allow(dead_code)]
pub(crate) enum RxTxQueue {
Rx(RxQueueIndex),
Tx(TxQueueIndex),
}
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct PortQueue {
port_id: PortId,
rxq: RxQueueIndex,
txq: TxQueueIndex,
kni: Option<KniTxQueue>,
#[cfg(feature = "metrics")]
received: Option<Counter>,
#[cfg(feature = "metrics")]
transmitted: Option<Counter>,
#[cfg(feature = "metrics")]
dropped: Option<Counter>,
}
impl PortQueue {
#[cfg(not(feature = "metrics"))]
fn new(port: PortId, rxq: RxQueueIndex, txq: TxQueueIndex) -> Self {
PortQueue {
port_id: port,
rxq,
txq,
kni: None,
}
}
#[cfg(feature = "metrics")]
fn new(port: PortId, rxq: RxQueueIndex, txq: TxQueueIndex) -> Self {
PortQueue {
port_id: port,
rxq,
txq,
kni: None,
received: None,
transmitted: None,
dropped: None,
}
}
pub(crate) fn receive(&self) -> Vec<Mbuf> {
const RX_BURST_MAX: usize = 32;
let mut ptrs = Vec::with_capacity(RX_BURST_MAX);
let len = unsafe {
ffi::_rte_eth_rx_burst(
self.port_id.0,
self.rxq.0,
ptrs.as_mut_ptr(),
RX_BURST_MAX as u16,
)
};
#[cfg(feature = "metrics")]
self.received.as_ref().unwrap().record(len as u64);
unsafe {
ptrs.set_len(len as usize);
ptrs.into_iter()
.map(|ptr| Mbuf::from_ptr(ptr))
.collect::<Vec<_>>()
}
}
pub(crate) fn transmit(&self, packets: Vec<Mbuf>) {
let mut ptrs = packets.into_iter().map(Mbuf::into_ptr).collect::<Vec<_>>();
loop {
let to_send = ptrs.len() as u16;
let sent = unsafe {
ffi::_rte_eth_tx_burst(self.port_id.0, self.txq.0, ptrs.as_mut_ptr(), to_send)
};
if sent > 0 {
#[cfg(feature = "metrics")]
self.transmitted.as_ref().unwrap().record(sent as u64);
if to_send - sent > 0 {
let _drained = ptrs.drain(..sent as usize).collect::<Vec<_>>();
} else {
break;
}
} else {
#[cfg(feature = "metrics")]
self.dropped.as_ref().unwrap().record(ptrs.len() as u64);
super::mbuf_free_bulk(ptrs);
break;
}
}
}
pub fn kni(&self) -> Option<&KniTxQueue> {
self.kni.as_ref()
}
fn set_kni(&mut self, kni: KniTxQueue) {
self.kni = Some(kni);
}
#[cfg(feature = "metrics")]
fn set_counters(&mut self, port: &str, core_id: CoreId) {
let counter = SINK.scoped("port").counter_with_labels(
"packets",
labels!(
"port" => port.to_owned(),
"dir" => "rx",
"core" => core_id.0.to_string(),
),
);
self.received = Some(counter);
let counter = SINK.scoped("port").counter_with_labels(
"packets",
labels!(
"port" => port.to_owned(),
"dir" => "tx",
"core" => core_id.0.to_string(),
),
);
self.transmitted = Some(counter);
let counter = SINK.scoped("port").counter_with_labels(
"dropped",
labels!(
"port" => port.to_owned(),
"dir" => "tx",
"core" => core_id.0.to_string(),
),
);
self.dropped = Some(counter);
}
pub fn mac_addr(&self) -> MacAddr {
super::eth_macaddr_get(self.port_id.0)
}
}
#[derive(Debug, Error)]
pub(crate) enum PortError {
#[error("Port {0} is not found.")]
NotFound(String),
#[error("Port is not bound to any cores.")]
CoreNotBound,
#[error("Insufficient number of RX queues '{0}'.")]
InsufficientRxQueues(usize),
#[error("Insufficient number of TX queues '{0}'.")]
InsufficientTxQueues(usize),
}
pub(crate) struct Port {
id: PortId,
name: String,
device: String,
queues: HashMap<CoreId, PortQueue>,
kni: Option<Kni>,
dev_info: ffi::rte_eth_dev_info,
}
impl Port {
pub(crate) fn id(&self) -> PortId {
self.id
}
pub(crate) fn name(&self) -> &str {
self.name.as_str()
}
pub(crate) fn mac_addr(&self) -> MacAddr {
super::eth_macaddr_get(self.id.0)
}
pub(crate) fn queues(&self) -> &HashMap<CoreId, PortQueue> {
&self.queues
}
pub(crate) fn kni(&mut self) -> Option<&mut Kni> {
self.kni.as_mut()
}
pub(crate) fn start(&mut self) -> Result<()> {
unsafe {
ffi::rte_eth_dev_start(self.id.0).into_result(DpdkError::from_errno)?;
}
info!("started port {}.", self.name());
Ok(())
}
pub(crate) fn stop(&mut self) {
unsafe {
ffi::rte_eth_dev_stop(self.id.0);
}
info!("stopped port {}.", self.name());
}
#[cfg(feature = "metrics")]
pub(crate) fn stats(&self) -> super::PortStats {
super::PortStats::build(self)
}
}
impl fmt::Debug for Port {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let info = self.dev_info;
f.debug_struct(&self.name())
.field("device", &self.device)
.field("port", &self.id.0)
.field("mac", &format_args!("\"{}\"", self.mac_addr()))
.field("driver", &info.driver_name.as_str())
.field("rx_offload", &format_args!("{:#x}", info.rx_offload_capa))
.field("tx_offload", &format_args!("{:#x}", info.tx_offload_capa))
.field("max_rxq", &info.max_rx_queues)
.field("max_txq", &info.max_tx_queues)
.field("socket", &self.id.socket_id().map_or(-1, |s| s.0))
.finish()
}
}
impl Drop for Port {
fn drop(&mut self) {
debug!("freeing {}.", self.name);
unsafe {
ffi::rte_eth_dev_close(self.id.0);
}
}
}
pub(crate) struct PortBuilder<'a> {
name: String,
device: String,
port_id: PortId,
dev_info: ffi::rte_eth_dev_info,
cores: Vec<CoreId>,
mempools: MempoolMap<'a>,
rxd: u16,
txd: u16,
}
impl<'a> PortBuilder<'a> {
pub(crate) fn new(name: String, device: String) -> Result<Self> {
let mut port_id = 0u16;
unsafe {
ffi::rte_eth_dev_get_port_by_name(device.clone().into_cstring().as_ptr(), &mut port_id)
.into_result(DpdkError::from_errno)?;
}
let port_id = PortId(port_id);
debug!("{} is {:?}.", name, port_id);
let mut dev_info = ffi::rte_eth_dev_info::default();
unsafe {
ffi::rte_eth_dev_info_get(port_id.0, &mut dev_info);
}
Ok(PortBuilder {
name,
device,
port_id,
dev_info,
cores: vec![CoreId::new(0)],
mempools: Default::default(),
rxd: 0,
txd: 0,
})
}
pub(crate) fn cores(&mut self, cores: &[CoreId]) -> Result<&mut Self> {
ensure!(!cores.is_empty(), PortError::CoreNotBound);
let mut cores = cores.to_vec();
cores.sort();
cores.dedup();
let len = cores.len() as u16;
ensure!(
self.dev_info.max_rx_queues >= len,
PortError::InsufficientRxQueues(self.dev_info.max_rx_queues as usize)
);
ensure!(
self.dev_info.max_tx_queues >= len,
PortError::InsufficientTxQueues(self.dev_info.max_tx_queues as usize)
);
self.cores = cores;
Ok(self)
}
pub(crate) fn rx_tx_queue_capacity(&mut self, rxd: usize, txd: usize) -> Result<&mut Self> {
let mut rxd2 = rxd as u16;
let mut txd2 = txd as u16;
unsafe {
ffi::rte_eth_dev_adjust_nb_rx_tx_desc(self.port_id.0, &mut rxd2, &mut txd2)
.into_result(DpdkError::from_errno)?;
}
info!(
cond: rxd2 != rxd as u16,
message = "adjusted rxd.",
before = rxd,
after = rxd2
);
info!(
cond: txd2 != txd as u16,
message = "adjusted txd.",
before = txd,
after = txd2
);
self.rxd = rxd2;
self.txd = txd2;
Ok(self)
}
pub(crate) fn mempools(&'a mut self, mempools: &'a mut [Mempool]) -> &'a mut Self {
self.mempools = MempoolMap::new(mempools);
self
}
#[allow(clippy::cognitive_complexity)]
pub(crate) fn finish(
&mut self,
promiscuous: bool,
multicast: bool,
with_kni: bool,
) -> anyhow::Result<Port> {
let len = self.cores.len() as u16;
let mut conf = ffi::rte_eth_conf::default();
if len > 1 {
conf.rxmode.mq_mode = ffi::rte_eth_rx_mq_mode::ETH_MQ_RX_RSS;
conf.rx_adv_conf.rss_conf.rss_hf =
DEFAULT_RSS_HF & self.dev_info.flow_type_rss_offloads;
}
if self.dev_info.tx_offload_capa & ffi::DEV_TX_OFFLOAD_MBUF_FAST_FREE as u64 > 0 {
conf.txmode.offloads |= ffi::DEV_TX_OFFLOAD_MBUF_FAST_FREE as u64;
debug!("turned on optimization for fast release of mbufs.");
}
unsafe {
ffi::rte_eth_dev_configure(self.port_id.0, len, len, &conf)
.into_result(DpdkError::from_errno)?;
}
let socket_id = self
.port_id
.socket_id()
.unwrap_or_else(|| self.cores[0].socket_id());
debug!("{} connected to {:?}.", self.name, socket_id);
let mempool = self.mempools.get_raw(socket_id)?;
let kni = if with_kni {
let kni = KniBuilder::new(mempool)
.name(&self.name)
.port_id(self.port_id)
.mac_addr(super::eth_macaddr_get(self.port_id.raw()))
.finish()?;
Some(kni)
} else {
None
};
let mut queues = HashMap::new();
for (idx, &core_id) in self.cores.iter().enumerate() {
warn!(
cond: core_id.socket_id() != socket_id,
message = "core socket does not match port socket.",
core = ?core_id,
core_socket = core_id.socket_id().0,
port_socket = socket_id.0
);
let rxq = RxQueueIndex(idx as u16);
unsafe {
ffi::rte_eth_rx_queue_setup(
self.port_id.0,
rxq.0,
self.rxd,
socket_id.0 as raw::c_uint,
ptr::null(),
mempool,
)
.into_result(DpdkError::from_errno)?;
}
let txq = TxQueueIndex(idx as u16);
unsafe {
ffi::rte_eth_tx_queue_setup(
self.port_id.0,
txq.0,
self.txd,
socket_id.0 as raw::c_uint,
ptr::null(),
)
.into_result(DpdkError::from_errno)?;
}
#[cfg(feature = "pcap-dump")]
{
pcap::capture_queue(
self.port_id,
self.name.as_str(),
core_id,
RxTxQueue::Rx(rxq),
)?;
pcap::capture_queue(
self.port_id,
self.name.as_str(),
core_id,
RxTxQueue::Tx(txq),
)?;
}
let mut q = PortQueue::new(self.port_id, rxq, txq);
if let Some(kni) = &kni {
q.set_kni(kni.txq());
}
#[cfg(feature = "metrics")]
q.set_counters(&self.name, core_id);
queues.insert(core_id, q);
debug!("initialized port queue for {:?}.", core_id);
}
unsafe {
if promiscuous {
ffi::rte_eth_promiscuous_enable(self.port_id.0);
} else {
ffi::rte_eth_promiscuous_disable(self.port_id.0);
}
if multicast {
ffi::rte_eth_allmulticast_enable(self.port_id.0);
} else {
ffi::rte_eth_allmulticast_disable(self.port_id.0);
}
}
info!("initialized port {}.", self.name);
Ok(Port {
id: self.port_id,
name: self.name.clone(),
device: self.device.clone(),
queues,
kni,
dev_info: self.dev_info,
})
}
}