use core::{mem::MaybeUninit, num::NonZeroU32, ptr::NonNull};
use xdpilone::xdp::XdpDesc;
use xdpilone::{BufIdx, IfInfo, Socket, SocketConfig, Umem, UmemConfig};
#[repr(align(4096))]
struct PacketMap(MaybeUninit<[u8; 1 << 20]>);
fn main() {
let args = <Args as clap::Parser>::parse();
let alloc = Box::new(PacketMap(MaybeUninit::uninit()));
let mem = NonNull::new(Box::leak(alloc).0.as_mut_ptr()).unwrap();
let umem = unsafe { Umem::new(UmemConfig::default(), mem) }.unwrap();
let info = ifinfo(&args).unwrap();
let sock = Socket::with_shared(&info, &umem).unwrap();
let device = umem.fq_cq(&sock).unwrap();
let rxtx = umem
.rx_tx(
&sock,
&SocketConfig {
rx_size: None,
tx_size: NonZeroU32::new(1 << 14),
bind_flags: SocketConfig::XDP_BIND_NEED_WAKEUP,
},
)
.unwrap();
assert!(rxtx.map_rx().is_err(), "did not provide a rx_size");
let tx = rxtx.map_tx().unwrap();
umem.bind(&rxtx).unwrap();
let desc = {
let mut frame = umem.frame(BufIdx(1)).unwrap();
prepare_buffer(frame.offset, unsafe { frame.addr.as_mut() }, &args)
};
eprintln!("Connection up!");
let mut tx = tx;
let mut device = device;
let start = std::time::Instant::now();
let batch: u32 = args.batch.unwrap_or(1 << 10);
let total: u32 = args.total.unwrap_or(1 << 20);
let mut sent = 0;
let mut completed = 0;
let mut stat_loops = 0;
let mut stat_stall = 0;
let mut stat_woken = 0;
let mut tx_log_batch = [0; 33];
let mut cq_log_batch = [0; 33];
eprintln!(
"Dumping {} B with {} packets!",
total as f32 * desc.len as f32,
total
);
eprintln!("The description is {:?}", desc,);
while !(sent == completed && sent == total) {
let sent_now: u32; let comp_now: u32;
{
let send_batch = total.saturating_sub(sent).min(batch);
let mut writer = tx.transmit(send_batch);
let bufs = core::iter::repeat(desc);
sent_now = writer.insert(bufs);
writer.commit();
}
if tx.needs_wakeup() {
tx.wake();
stat_woken += 1;
}
{
let comp_batch = sent.saturating_sub(completed).min(batch);
let mut reader = device.complete(comp_batch);
let mut comp_temp = 0;
while reader.read().is_some() {
comp_temp += 1;
}
comp_now = comp_temp;
reader.release();
}
if sent_now == 0 && comp_now == 0 {
stat_stall += 1;
}
sent += sent_now;
completed += comp_now;
stat_loops += 1;
tx_log_batch[32 - sent_now.leading_zeros() as usize] += 1;
cq_log_batch[32 - comp_now.leading_zeros() as usize] += 1;
}
let end = std::time::Instant::now();
let secs = end.saturating_duration_since(start).as_secs_f32();
let packets = completed as f32;
let bytes = packets * desc.len as f32;
eprintln!(
"{:?} s; {} pkt; {} pkt/s; {} B/s; {} L1-B/s",
secs,
packets,
packets / secs,
bytes / secs,
(bytes + packets * 20.) / secs,
);
eprintln!(
"Statistics\nLoops: {}; stalled: {}; wake/sys-call: {}",
stat_loops, stat_stall, stat_woken
);
eprintln!("Tx Batch size (log2): {:?}", tx_log_batch);
eprintln!("Cq Batch size (log2): {:?}", cq_log_batch);
}
fn prepare_buffer(offset: u64, buffer: &mut [u8], args: &Args) -> XdpDesc {
buffer[..ARP.len()].copy_from_slice(&ARP[..]);
let extra = args.length.unwrap_or(0).saturating_sub(ARP.len() as u32);
XdpDesc {
addr: offset,
len: ARP.len() as u32 + extra,
options: 0,
}
}
#[derive(clap::Parser)]
struct Args {
ifname: String,
#[arg(long = "queue-id")]
queue_id: Option<u32>,
#[arg(long = "batch-size")]
batch: Option<u32>,
#[arg(long = "packet-total")]
total: Option<u32>,
#[arg(long = "packet-length")]
length: Option<u32>,
}
fn ifinfo(args: &Args) -> Result<IfInfo, xdpilone::Errno> {
let mut bytes = String::from(&args.ifname);
bytes.push('\0');
let bytes = bytes.as_bytes();
let name = core::ffi::CStr::from_bytes_with_nul(bytes).unwrap();
let mut info = IfInfo::invalid();
info.from_name(name)?;
if let Some(q) = args.queue_id {
info.set_queue(q);
}
Ok(info)
}
#[rustfmt::skip]
static ARP: [u8; 14+28] = [
0x11, 0x12, 0x13, 0x14, 0x15, 0x16,
0x31, 0x32, 0x33, 0x34, 0x35, 0x36,
0x08, 0x06,
0x00, 0x01,
0x08, 0x00, 0x06, 0x04,
0x00, 0x01,
0x11, 0x12, 0x13, 0x14, 0x15, 0x16,
0x21, 0x22, 0x23, 0x24,
0x31, 0x32, 0x33, 0x34, 0x35, 0x36,
0x41, 0x42, 0x43, 0x44,
];