#![allow(dead_code)]
use quilkin_xdp::xdp::{
self,
nic::{NicIndex, NicName},
};
use std::sync::Arc;
pub mod process;
pub enum NicConfig<'n> {
Name(&'n str),
Index(u32),
Default,
}
pub struct XdpConfig<'n> {
pub nic: NicConfig<'n>,
pub external_port: u16,
pub qcmp_port: u16,
pub maximum_packet_memory: Option<u64>,
pub require_zero_copy: bool,
pub require_tx_checksum: bool,
}
impl Default for XdpConfig<'_> {
fn default() -> Self {
Self {
nic: NicConfig::Default,
external_port: 7777,
qcmp_port: 7600,
maximum_packet_memory: None,
require_zero_copy: false,
require_tx_checksum: false,
}
}
}
pub struct XdpWorkers {
ebpf_prog: quilkin_xdp::EbpfProgram,
workers: Vec<quilkin_xdp::XdpWorker>,
nic: NicIndex,
external_port: NetworkU16,
qcmp_port: NetworkU16,
ipv6: std::net::Ipv6Addr,
ipv4: std::net::Ipv4Addr,
}
#[derive(thiserror::Error, Debug)]
pub enum NicUnavailable {
#[error("failed to query NIC: {0}")]
Query(#[source] std::io::Error),
#[error("no NICs available that could be considered a default")]
NoAvailableDefault,
#[error("no NIC named '{0}'")]
UnknownName(String),
#[error("no NIC with index '{0}'")]
UnknownIndex(u32),
}
#[derive(thiserror::Error, Debug)]
pub enum XdpSetupError {
#[error("NIC is unavailable: {0}")]
NicUnavailable(#[from] NicUnavailable),
#[error("failed to query device capabilities for {0}: {1}")]
NicQuery(NicName, #[source] std::io::Error),
#[error("failed to query ip addresses for {0}: {1}")]
AddressQuery(NicName, #[source] std::io::Error),
#[error("`XDP_ZEROCOPY` is unavailable for {0}")]
ZeroCopyUnavailable(NicName),
#[error("`XDP_TXMD_FLAGS_TIMESTAMP` is unavailable for {0}")]
TxChecksumUnavailable(NicName),
#[error(
"the requested maximum packet memory {max:.2}{xunit} must be at least {min:.2}{nunit} as {nic} has a queue count of {queue_count}"
)]
MinimumMemoryRequirementsExceeded {
max: f64,
xunit: &'static str,
min: f64,
nunit: &'static str,
nic: NicName,
queue_count: u32,
},
#[error("XDP error: {0}")]
Xdp(#[from] xdp::error::Error),
#[error("XDP load error: {0}")]
XdpLoad(#[from] quilkin_xdp::LoadError),
#[error("bind error: {0}")]
BindError(#[from] quilkin_xdp::BindError),
}
#[derive(thiserror::Error, Debug)]
pub enum XdpSpawnError {
#[error("Failed to spawn worker thread: {0}")]
Thread(#[source] std::io::Error),
#[error("Failed to attach XDP program: {0}")]
XdpAttach(#[from] quilkin_xdp::aya::programs::ProgramError),
}
pub fn setup_xdp_io(config: XdpConfig<'_>) -> Result<XdpWorkers, XdpSetupError> {
let nic_index = match config.nic {
NicConfig::Default => {
let mut chosen = None;
for iface in xdp::nic::InterfaceIter::new().map_err(NicUnavailable::Query)? {
if let Some(chosen) = chosen {
if iface != chosen {
return Err(NicUnavailable::NoAvailableDefault.into());
}
} else {
chosen = Some(iface);
}
}
chosen.ok_or(NicUnavailable::NoAvailableDefault)?
}
NicConfig::Name(name) => {
let cname = std::ffi::CString::new(name).unwrap();
xdp::nic::NicIndex::lookup_by_name(&cname)
.map_err(NicUnavailable::Query)?
.ok_or_else(|| NicUnavailable::UnknownName(name.to_owned()))?
}
NicConfig::Index(index) => xdp::nic::NicIndex::new(index),
};
let name = nic_index
.name()
.map_err(|_err| NicUnavailable::UnknownIndex(nic_index.into()))?;
tracing::info!(nic = ?nic_index, "selected NIC");
let device_caps = nic_index
.query_capabilities()
.map_err(|err| XdpSetupError::NicQuery(name, err))?;
tracing::debug!(?device_caps, nic = ?nic_index, "XDP features for device");
if config.require_zero_copy
&& matches!(device_caps.zero_copy, xdp::nic::XdpZeroCopy::Unavailable)
{
tracing::error!(?device_caps, nic = ?nic_index, "XDP features for device");
return Err(XdpSetupError::ZeroCopyUnavailable(name));
}
if config.require_tx_checksum && !device_caps.tx_metadata.checksum() {
tracing::error!(?device_caps, nic = ?nic_index, "XDP features for device");
return Err(XdpSetupError::TxChecksumUnavailable(name));
}
let (ipv4, ipv6) = nic_index
.addresses()
.and_then(|(ipv4, ipv6)| {
if ipv4.is_none() && ipv6.is_none() {
Err(std::io::Error::new(
std::io::ErrorKind::AddrNotAvailable,
"neither an ipv4 nor ipv6 address could be determined for the device",
))
} else {
Ok((
ipv4.unwrap_or(std::net::Ipv4Addr::new(0, 0, 0, 0)),
ipv6.unwrap_or(std::net::Ipv6Addr::from_bits(0)),
))
}
})
.map_err(|err| XdpSetupError::AddressQuery(name, err))?;
const MINIMUM_UMEM_COUNT: u64 = 128;
const PACKET_SIZE: u64 = 2 * 1024;
let packet_count = if let Some(max) = config.maximum_packet_memory {
let bytes_per_socket = max / device_caps.queue_count as u64;
let packet_count = (bytes_per_socket / PACKET_SIZE).next_power_of_two();
if MINIMUM_UMEM_COUNT > packet_count {
fn byte_units(b: u64) -> (f64, &'static str) {
let mut units = b as f64;
let mut unit = 0;
const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB"];
while units > 1024.0 {
units /= 1024.0;
unit += 1;
}
(units, UNITS[unit])
}
let (max, xunit) = byte_units(max);
let (min, nunit) =
byte_units(MINIMUM_UMEM_COUNT * PACKET_SIZE * device_caps.queue_count as u64);
return Err(XdpSetupError::MinimumMemoryRequirementsExceeded {
max,
xunit,
min,
nunit,
nic: name,
queue_count: device_caps.queue_count,
});
}
packet_count as u32
} else {
2 * 1024
};
let mut ebpf_prog = quilkin_xdp::EbpfProgram::load(config.external_port, config.qcmp_port)?;
let umem_cfg = xdp::umem::UmemCfgBuilder {
frame_size: xdp::umem::FrameSize::TwoK,
head_room: (xdp::packet::net_types::Ipv6Hdr::LEN - xdp::packet::net_types::Ipv4Hdr::LEN)
as u32,
frame_count: packet_count,
tx_checksum: device_caps.tx_metadata.checksum(),
..Default::default()
}
.build()?;
let ring_cfg = xdp::RingConfigBuilder::default().build()?;
let workers = ebpf_prog.create_and_bind_sockets(nic_index, umem_cfg, &device_caps, ring_cfg)?;
Ok(XdpWorkers {
ebpf_prog,
workers,
nic: nic_index,
external_port: config.external_port.into(),
qcmp_port: config.qcmp_port.into(),
ipv4,
ipv6,
})
}
pub struct XdpLoop {
threads: Vec<std::thread::JoinHandle<()>>,
ebpf_prog: quilkin_xdp::EbpfProgram,
xdp_link: quilkin_xdp::aya::programs::xdp::XdpLinkId,
shutdown: Arc<std::sync::atomic::AtomicBool>,
}
impl XdpLoop {
pub fn shutdown(mut self, wait: bool) {
if let Err(error) = self.ebpf_prog.detach(self.xdp_link) {
tracing::error!(%error, "failed to detach eBPF program");
}
self.shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if !wait {
return;
}
for jh in self.threads {
if let Err(error) = jh.join() {
if let Some(error) = error.downcast_ref::<&'static str>() {
tracing::error!(error, "XDP I/O thread enountered error");
} else if let Some(error) = error.downcast_ref::<String>() {
tracing::error!(error, "XDP I/O thread enountered error");
} else {
tracing::error!(?error, "XDP I/O thread enountered error");
};
}
}
}
}
pub fn spawn(workers: XdpWorkers, config: process::ConfigState) -> Result<XdpLoop, XdpSpawnError> {
let external_port = workers.external_port;
let qcmp_port = workers.qcmp_port;
let ipv4 = workers.ipv4;
let ipv6 = workers.ipv6;
let session_state = Arc::new(process::SessionState::default());
let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut threads = Vec::with_capacity(workers.workers.len());
for (i, mut worker) in workers.workers.into_iter().enumerate() {
let cfg = config.clone();
let ss = session_state.clone();
let shutdown = shutdown.clone();
let jh = std::thread::Builder::new()
.name(format!("xdp-io-{i}"))
.spawn(move || {
unsafe {
if let Err(error) = worker.fill.enqueue(&mut worker.umem, BATCH_SIZE * 2, true)
{
tracing::error!(%error, "failed to kick fill ring during initial spinup");
}
};
io_loop(
worker,
external_port,
qcmp_port,
cfg,
ss,
ipv4,
ipv6,
shutdown.clone(),
);
})
.map_err(XdpSpawnError::Thread)?;
threads.push(jh);
}
let mut ebpf_prog = workers.ebpf_prog;
let xdp_link =
ebpf_prog.attach(workers.nic, quilkin_xdp::aya::programs::XdpFlags::default())?;
Ok(XdpLoop {
threads,
ebpf_prog,
xdp_link,
shutdown,
})
}
const BATCH_SIZE: usize = 64;
use xdp::packet::net_types::NetworkU16;
use crate::time::UtcTimestamp;
#[allow(clippy::too_many_arguments)]
fn io_loop(
worker: quilkin_xdp::XdpWorker,
external_port: NetworkU16,
qcmp_port: NetworkU16,
mut config: process::ConfigState,
sessions: Arc<process::SessionState>,
local_ipv4: std::net::Ipv4Addr,
local_ipv6: std::net::Ipv6Addr,
shutdown: Arc<std::sync::atomic::AtomicBool>,
) {
let quilkin_xdp::XdpWorker {
mut umem,
socket,
mut fill,
mut rx,
mut tx,
mut completion,
} = worker;
const POLL_TIMEOUT: xdp::socket::PollTimeout =
xdp::socket::PollTimeout::new(Some(std::time::Duration::from_millis(100)));
let mut state = process::State {
external_port,
qcmp_port,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions,
local_ipv4,
local_ipv6,
last_receive: UtcTimestamp::now(),
};
use xdp::slab::Slab;
let mut rx_slab = xdp::slab::StackSlab::<BATCH_SIZE>::new();
let mut tx_slab = xdp::slab::StackSlab::<{ BATCH_SIZE << 2 }>::new();
let mut pending_sends = 0;
unsafe {
while !shutdown.load(std::sync::atomic::Ordering::Relaxed) {
let Ok(true) = socket.poll_read(POLL_TIMEOUT) else {
continue;
};
let recvd = rx.recv(&umem, &mut rx_slab);
if let Err(error) = fill.enqueue(&mut umem, BATCH_SIZE * 2 - recvd, true) {
crate::metrics::errors_total(
crate::metrics::Direction::Read,
&io_error_to_discriminant(error),
&crate::metrics::EMPTY,
)
.inc();
}
process::process_packets(
&mut rx_slab,
&mut umem,
&mut tx_slab,
&mut config,
&mut state,
);
let before = tx_slab.len();
let enqueued_sends = match tx.send(&mut tx_slab, true) {
Ok(es) => es,
Err(error) => {
crate::metrics::errors_total(
crate::metrics::Direction::Read,
&io_error_to_discriminant(error),
&crate::metrics::EMPTY,
)
.inc();
before - tx_slab.len()
}
};
pending_sends += enqueued_sends;
pending_sends -= completion.dequeue(&mut umem, pending_sends);
}
}
}
#[inline]
fn io_error_to_discriminant(error: std::io::Error) -> std::borrow::Cow<'static, str> {
let Some(code) = error.raw_os_error() else {
return error.to_string().into();
};
match code {
libc::EBUSY => "EBUSY".into(),
libc::ENOBUFS => "ENOBUFS".into(),
libc::EAGAIN => "EAGAIN".into(),
libc::ENETDOWN => "ENETDOWN".into(),
other => format!("{other}").into(),
}
}