use super::{Mbuf, PortId};
use crate::dpdk::DpdkError;
use crate::ffi::{self, AsStr, ToResult};
#[cfg(feature = "metrics")]
use crate::metrics::{labels, Counter, SINK};
use crate::net::MacAddr;
use crate::{debug, error, warn};
use anyhow::Result;
use futures::{future, Future, StreamExt};
use std::cmp;
use std::mem;
use std::os::raw;
use std::ptr::{self, NonNull};
use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
#[cfg(feature = "metrics")]
fn new_counter(name: &'static str, kni: &str, dir: &'static str) -> Counter {
SINK.scoped("kni").counter_with_labels(
name,
labels!(
"kni" => kni.to_string(),
"dir" => dir,
),
)
}
#[allow(missing_debug_implementations)]
pub struct KniRx {
raw: NonNull<ffi::rte_kni>,
#[cfg(feature = "metrics")]
packets: Counter,
#[cfg(feature = "metrics")]
octets: Counter,
}
impl KniRx {
#[cfg(not(feature = "metrics"))]
pub fn new(raw: NonNull<ffi::rte_kni>) -> Self {
KniRx { raw }
}
#[cfg(feature = "metrics")]
pub fn new(raw: NonNull<ffi::rte_kni>) -> Self {
let name = unsafe { ffi::rte_kni_get_name(raw.as_ref()).as_str().to_owned() };
let packets = new_counter("packets", &name, "rx");
let octets = new_counter("octets", &name, "rx");
KniRx {
raw,
packets,
octets,
}
}
pub fn receive(&mut self) -> Vec<Mbuf> {
const RX_BURST_MAX: usize = 32;
let mut ptrs = Vec::with_capacity(RX_BURST_MAX);
let len = unsafe {
ffi::rte_kni_rx_burst(
self.raw.as_mut(),
ptrs.as_mut_ptr(),
RX_BURST_MAX as raw::c_uint,
)
};
let mbufs = unsafe {
ptrs.set_len(len as usize);
ptrs.into_iter()
.map(|ptr| Mbuf::from_ptr(ptr))
.collect::<Vec<_>>()
};
unsafe {
if let Err(err) =
ffi::rte_kni_handle_request(self.raw.as_mut()).into_result(|_| DpdkError::new())
{
warn!(message = "failed to handle change link requests.", ?err);
}
}
#[cfg(feature = "metrics")]
{
self.packets.record(mbufs.len() as u64);
let bytes: usize = mbufs.iter().map(Mbuf::data_len).sum();
self.octets.record(bytes as u64);
}
mbufs
}
}
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct KniTxQueue {
tx_enque: UnboundedSender<Vec<Mbuf>>,
}
impl KniTxQueue {
pub fn transmit(&mut self, packets: Vec<Mbuf>) {
if let Err(err) = self.tx_enque.try_send(packets) {
warn!(message = "failed to send to kni tx queue.");
Mbuf::free_bulk(err.into_inner());
}
}
}
pub(crate) struct KniTx {
raw: NonNull<ffi::rte_kni>,
tx_deque: Option<UnboundedReceiver<Vec<Mbuf>>>,
#[cfg(feature = "metrics")]
packets: Counter,
#[cfg(feature = "metrics")]
octets: Counter,
#[cfg(feature = "metrics")]
dropped: Counter,
}
impl KniTx {
#[cfg(not(feature = "metrics"))]
pub(crate) fn new(raw: NonNull<ffi::rte_kni>, tx_deque: UnboundedReceiver<Vec<Mbuf>>) -> Self {
KniTx {
raw,
tx_deque: Some(tx_deque),
}
}
#[cfg(feature = "metrics")]
pub(crate) fn new(raw: NonNull<ffi::rte_kni>, tx_deque: UnboundedReceiver<Vec<Mbuf>>) -> Self {
let name = unsafe { ffi::rte_kni_get_name(raw.as_ref()).as_str().to_owned() };
let packets = new_counter("packets", &name, "tx");
let octets = new_counter("octets", &name, "tx");
let dropped = new_counter("dropped", &name, "tx");
KniTx {
raw,
tx_deque: Some(tx_deque),
packets,
octets,
dropped,
}
}
pub(crate) fn transmit(&mut self, packets: Vec<Mbuf>) {
let mut ptrs = packets.into_iter().map(Mbuf::into_ptr).collect::<Vec<_>>();
loop {
let to_send = ptrs.len() as raw::c_uint;
let sent =
unsafe { ffi::rte_kni_tx_burst(self.raw.as_mut(), ptrs.as_mut_ptr(), to_send) };
if sent > 0 {
#[cfg(feature = "metrics")]
{
self.packets.record(sent as u64);
let bytes: u64 = ptrs[..sent as usize]
.iter()
.map(|&ptr| unsafe { (*ptr).data_len as u64 })
.sum();
self.octets.record(bytes);
}
if to_send - sent > 0 {
let _ = ptrs.drain(..sent as usize);
} else {
break;
}
} else {
#[cfg(feature = "metrics")]
self.dropped.record(to_send as u64);
super::mbuf_free_bulk(ptrs);
break;
}
}
}
pub(crate) fn into_pipeline(mut self) -> impl Future<Output = ()> {
self.tx_deque.take().unwrap().for_each(move |packets| {
self.transmit(packets);
future::ready(())
})
}
}
unsafe impl Send for KniRx {}
unsafe impl Send for KniTx {}
#[derive(Debug, Error)]
pub(crate) enum KniError {
#[error("KNI is not enabled for the port.")]
Disabled,
#[error("Another core owns the handle.")]
NotAcquired,
}
pub(crate) struct Kni {
raw: NonNull<ffi::rte_kni>,
rx: Option<KniRx>,
tx: Option<KniTx>,
txq: KniTxQueue,
}
impl Kni {
pub(crate) fn new(raw: NonNull<ffi::rte_kni>) -> Kni {
let (send, recv) = mpsc::unbounded_channel();
let rx = KniRx::new(raw);
let tx = KniTx::new(raw, recv);
let txq = KniTxQueue { tx_enque: send };
Kni {
raw,
rx: Some(rx),
tx: Some(tx),
txq,
}
}
pub(crate) fn take_rx(&mut self) -> Result<KniRx> {
self.rx.take().ok_or_else(|| KniError::NotAcquired.into())
}
pub(crate) fn take_tx(&mut self) -> Result<KniTx> {
self.tx.take().ok_or_else(|| KniError::NotAcquired.into())
}
pub(crate) fn txq(&self) -> KniTxQueue {
self.txq.clone()
}
#[inline]
pub(crate) fn raw_mut(&mut self) -> &mut ffi::rte_kni {
unsafe { self.raw.as_mut() }
}
}
impl Drop for Kni {
fn drop(&mut self) {
debug!("freeing kernel interface.");
if let Err(err) =
unsafe { ffi::rte_kni_release(self.raw_mut()).into_result(|_| DpdkError::new()) }
{
error!(message = "failed to release KNI device.", ?err);
}
}
}
extern "C" fn change_mtu(port_id: u16, new_mtu: raw::c_uint) -> raw::c_int {
warn!("ignored change port {} mtu to {}.", port_id, new_mtu);
-1
}
extern "C" fn config_network_if(port_id: u16, if_up: u8) -> raw::c_int {
warn!("ignored change port {} status to {}.", port_id, if_up);
0
}
extern "C" fn config_mac_address(port_id: u16, _mac_addr: *mut u8) -> raw::c_int {
warn!("ignored change port {} mac address.", port_id);
-1
}
extern "C" fn config_promiscusity(port_id: u16, to_on: u8) -> raw::c_int {
warn!("ignored change port {} promiscusity to {}.", port_id, to_on);
-1
}
pub(crate) struct KniBuilder<'a> {
mempool: &'a mut ffi::rte_mempool,
conf: ffi::rte_kni_conf,
ops: ffi::rte_kni_ops,
}
impl<'a> KniBuilder<'a> {
pub(crate) fn new(mempool: &'a mut ffi::rte_mempool) -> Self {
KniBuilder {
mempool,
conf: ffi::rte_kni_conf::default(),
ops: ffi::rte_kni_ops::default(),
}
}
pub(crate) fn name(&mut self, name: &str) -> &mut Self {
unsafe {
self.conf.name = mem::zeroed();
ptr::copy(
name.as_ptr(),
self.conf.name.as_mut_ptr() as *mut u8,
cmp::min(name.len(), self.conf.name.len()),
);
}
self
}
pub(crate) fn port_id(&mut self, port_id: PortId) -> &mut Self {
self.conf.group_id = port_id.raw();
self.ops.port_id = port_id.raw();
self
}
pub(crate) fn mac_addr(&mut self, mac: MacAddr) -> &mut Self {
unsafe {
self.conf.mac_addr = mem::transmute(mac);
}
self
}
pub(crate) fn finish(&mut self) -> Result<Kni> {
self.conf.mbuf_size = ffi::RTE_MBUF_DEFAULT_BUF_SIZE;
self.ops.change_mtu = Some(change_mtu);
self.ops.config_network_if = Some(config_network_if);
self.ops.config_mac_address = Some(config_mac_address);
self.ops.config_promiscusity = Some(config_promiscusity);
unsafe {
ffi::rte_kni_alloc(self.mempool, &self.conf, &mut self.ops)
.into_result(|_| DpdkError::new())
.map(Kni::new)
}
}
}
pub(crate) fn kni_init(max: usize) -> Result<()> {
unsafe {
ffi::rte_kni_init(max as raw::c_uint)
.into_result(DpdkError::from_errno)
.map(|_| ())
}
}
pub(crate) fn kni_close() {
unsafe {
ffi::rte_kni_close();
}
}