use arraydeque::{ArrayDeque, Wrapping};
use rlimit::{setrlimit, Resource, Rlim};
use std::cmp::min;
use structopt::StructOpt;
use afxdp::buf::Buf;
use afxdp::buf_mmap::BufMmap;
use afxdp::mmap_area::{MmapArea, MmapAreaOptions};
use afxdp::socket::{Socket, SocketOptions, SocketRx};
use afxdp::umem::{Umem, UmemFillQueue};
use afxdp::PENDING_LEN;
use libbpf_sys::{XSK_RING_CONS__DEFAULT_NUM_DESCS, XSK_RING_PROD__DEFAULT_NUM_DESCS};
use pnet::packet::ethernet::EthernetPacket;
fn dump<'a>(
input: &mut ArrayDeque<[BufMmap<'a, BufCustom>; PENDING_LEN], Wrapping>,
buf_pool: &mut Vec<BufMmap<'a, BufCustom>>,
count: &mut usize,
) {
if input.is_empty() {
return;
}
*count = *count + input.len();
for buf in input.drain(0..) {
let ethernet = EthernetPacket::new(buf.get_data());
println!("{:?}", ethernet);
buf_pool.push(buf);
}
}
struct State<'a> {
fq: UmemFillQueue<'a, BufCustom>,
rx: SocketRx<'a, BufCustom>,
fq_deficit: usize,
}
#[derive(Default, Debug, Clone, Copy)]
struct Stats {
cq_bufs: usize,
fq_bufs: usize,
rx_packets: usize,
tx_packets: usize,
}
#[derive(Default, Copy, Clone)]
struct BufCustom {}
#[derive(StructOpt, Debug)]
#[structopt(name = "dump")]
struct Opt {
#[structopt(long, default_value = "2048")]
bufsize: usize,
#[structopt(long, default_value = "65536")]
bufnum: usize,
#[structopt(long, default_value = "64")]
batch_size: usize,
#[structopt(long)]
link_name: std::string::String,
#[structopt(long, default_value = "0")]
link_channel: usize,
#[structopt(long)]
huge_tlb: bool,
#[structopt(long)]
zero_copy: bool,
#[structopt(long, conflicts_with = "zero-copy")]
copy: bool,
}
fn main() {
let opt = Opt::from_args();
assert!(setrlimit(Resource::MEMLOCK, Rlim::INFINITY, Rlim::INFINITY).is_ok());
let options: MmapAreaOptions;
if opt.huge_tlb {
options = MmapAreaOptions { huge_tlb: true };
} else {
options = MmapAreaOptions { huge_tlb: false };
}
let r = MmapArea::new(opt.bufnum, opt.bufsize, options);
let (area, mut bufs) = match r {
Ok((area, bufs)) => (area, bufs),
Err(err) => panic!("no mmap for you: {:?}", err),
};
assert!(bufs.len() == opt.bufnum);
println!("Total buffers={} buffer size={}", opt.bufnum, opt.bufsize);
let r = Umem::new(
area.clone(),
XSK_RING_CONS__DEFAULT_NUM_DESCS,
XSK_RING_PROD__DEFAULT_NUM_DESCS,
);
let (umem1, _, mut umem1fq) = match r {
Ok(umem) => umem,
Err(err) => panic!("no umem for you: {:?}", err),
};
let mut options = SocketOptions::default();
options.zero_copy_mode = opt.zero_copy;
options.copy_mode = opt.copy;
let r = Socket::new_rx(
umem1.clone(),
&opt.link_name,
opt.link_channel,
XSK_RING_CONS__DEFAULT_NUM_DESCS,
options,
);
let (_, skt1rx) = match r {
Ok(skt) => skt,
Err(err) => panic!("no socket for you: {:?}", err),
};
let fill_size = min(XSK_RING_PROD__DEFAULT_NUM_DESCS as usize, opt.bufnum);
println!("Adding {} buffers to the fill queue", fill_size);
let r = umem1fq.fill(&mut bufs, fill_size);
match r {
Ok(n) => {
if n != fill_size {
panic!(
"Initial fill of umem incomplete. Wanted {} got {}.",
opt.bufnum, n
);
}
}
Err(err) => panic!("error: {:?}", err),
}
let mut v: ArrayDeque<[BufMmap<BufCustom>; PENDING_LEN], Wrapping> = ArrayDeque::new();
let mut state = State {
fq: umem1fq,
rx: skt1rx,
fq_deficit: 0,
};
let mut stats: Stats = Default::default();
let custom = BufCustom {};
loop {
let r = state.rx.try_recv(&mut v, opt.batch_size, custom);
match r {
Ok(n) => {
if n > 0 {
stats.rx_packets += n;
state.fq_deficit += n;
} else if state.fq.needs_wakeup() {
state.rx.wake();
}
}
Err(err) => {
panic!("error: {:?}", err);
}
}
let mut count: usize = 0;
dump(&mut v, &mut bufs, &mut count);
if bufs.len() == 0 {
println!("oops");
}
const FILL_THRESHOLD: usize = 64;
if state.fq_deficit >= FILL_THRESHOLD {
let r = state.fq.fill(&mut bufs, state.fq_deficit);
match r {
Ok(n) => {
if n != FILL_THRESHOLD {
println!("under filled got {} wanted {}", n, FILL_THRESHOLD);
}
stats.fq_bufs += n;
state.fq_deficit -= n;
}
Err(err) => panic!("error: {:?}", err),
}
}
}
}