use std::cmp::min;
use std::hint;
use std::io::Write;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time;
use std::{fmt::Display, io};
use arraydeque::{ArrayDeque, Wrapping};
use rlimit::{setrlimit, Resource, Rlim};
use afxdp::buf_pool::BufPool;
use afxdp::mmap_area::{MmapArea, MmapAreaOptions, MmapError};
use afxdp::socket::{Socket, SocketOptions, SocketRx, SocketTx};
use afxdp::umem::{Umem, UmemCompletionQueue, UmemFillQueue};
use afxdp::PENDING_LEN;
use afxdp::{buf_mmap::BufMmap, buf_pool_vec::BufPoolVec};
const RING_SIZE: u32 = 512;
const BUF_SIZE: usize = 2048;
const BUF_NUM: usize = 4096;
const SOCKET_BATCH_SIZE: usize = 64;
const FILL_THRESHOLD: usize = 64;
const FILL_BATCH_SIZE: usize = 128;
const SERVICE_THRESHOLD: usize = 64;
const SERVICE_BATCH_SIZE: usize = 128;
#[derive(Default, Copy, Clone)]
pub struct BufCustom {}
pub trait TrafficTest {
fn done(&mut self, now: time::Instant) -> bool;
fn transmit<'a>(
&mut self,
now: time::Instant,
port: usize,
bufs: &mut Vec<BufMmap<'a, BufCustom>>,
pending: &mut ArrayDeque<[BufMmap<'a, BufCustom>; PENDING_LEN], Wrapping>,
) -> usize;
fn receive<'a>(
&mut self,
now: time::Instant,
port: usize,
bufs: &mut Vec<BufMmap<'a, BufCustom>>,
pending: &mut ArrayDeque<[BufMmap<'a, BufCustom>; PENDING_LEN], Wrapping>,
) -> usize;
fn finalize(&self);
}
struct WorkerConfig<'a, T>
where
T: TrafficTest,
{
test: Arc<Mutex<T>>,
bp: BufPoolVec<BufMmap<'a, BufCustom>, BufCustom>,
port: usize, link_rx: SocketRx<'a, BufCustom>,
link_tx: SocketTx<'a, BufCustom>,
link_cq: UmemCompletionQueue<'a, BufCustom>,
link_fq: UmemFillQueue<'a, BufCustom>,
}
fn gen_worker_config<T>(
link_name: &str,
link_channel: usize,
port: usize, buf_size: usize,
buf_num: usize,
test: Arc<Mutex<T>>,
) -> WorkerConfig<T>
where
T: TrafficTest,
{
assert!(port <= 1);
let options: MmapAreaOptions = Default::default();
let r: Result<(std::sync::Arc<MmapArea<BufCustom>>, Vec<BufMmap<BufCustom>>), MmapError> =
MmapArea::new(buf_num, buf_size, options);
let (area, mut bufs) = match r {
Ok((area, bufs)) => (area, bufs),
Err(err) => panic!("Failed to create MmapArea: {:?}", err),
};
println!(
"Created MmapArea with {} buffers of size {} for a total of {} bytes",
bufs.len(),
buf_size,
bufs.len() * buf_size,
);
let mut bp: BufPoolVec<BufMmap<BufCustom>, BufCustom> = BufPoolVec::new(bufs.len());
let len = bufs.len();
let r = bp.put(&mut bufs, len);
assert!(r == len);
let r = Umem::new(area.clone(), RING_SIZE, RING_SIZE);
let (umem1, umem1cq, umem1fq) = match r {
Ok(umem) => umem,
Err(err) => panic!("Failed to create Umem: {:?}", err),
};
let options = SocketOptions::default();
let r = Socket::new(
umem1.clone(),
link_name,
link_channel,
RING_SIZE,
RING_SIZE,
options,
);
let (_skt1, skt1rx, skt1tx) = match r {
Ok(skt) => skt,
Err(err) => panic!(
"Failed to create socket for {}:{} - {:?}",
link_name, link_channel, err
),
};
WorkerConfig {
bp,
test,
port,
link_rx: skt1rx,
link_tx: skt1tx,
link_cq: umem1cq,
link_fq: umem1fq,
}
}
#[derive(Default, Debug, Clone, Copy)]
struct Stats {
cq_bufs_received: usize,
fq_bufs_filled: usize,
rx_packets: usize,
tx_packets: usize,
cq_deficit: usize,
fq_deficit: usize,
}
fn do_worker<T>(mut config: WorkerConfig<T>)
where
T: TrafficTest,
{
let total_bufs = config.bp.len();
let mut bufs = Vec::with_capacity(total_bufs);
let r = config.bp.get(&mut bufs, total_bufs);
if r != total_bufs {
println!(
"Failed to get initial bufs. Wanted {} got {}",
total_bufs, r
);
}
let initial_fill_num: usize = RING_SIZE as usize;
println!("Filling umem with {} buffers", initial_fill_num);
let r = config.link_fq.fill(&mut bufs, initial_fill_num);
match r {
Ok(n) => {
if n != initial_fill_num {
panic!(
"Initial fill of umem1 incomplete: {} of {}",
n, initial_fill_num
);
}
}
Err(err) => println!("error: {:?}", err),
}
let mut pending_receive: ArrayDeque<[BufMmap<BufCustom>; PENDING_LEN], Wrapping> =
ArrayDeque::new();
let mut pending_send: ArrayDeque<[BufMmap<BufCustom>; PENDING_LEN], Wrapping> =
ArrayDeque::new();
let mut stats: Stats = Default::default();
let bc = BufCustom {};
let mut now: time::Instant;
let mut did_work: bool;
loop {
now = time::Instant::now();
did_work = false;
if config.test.lock().unwrap().done(now) {
println!("pending_send: {}", pending_send.len());
println!("pending_receive: {}", pending_receive.len());
println!("stats.tx_packets: {}", stats.tx_packets);
println!("stats.rx_packets: {}", stats.rx_packets);
break;
}
if stats.cq_deficit >= SERVICE_THRESHOLD {
let r = config.link_cq.service(&mut bufs, SERVICE_BATCH_SIZE);
match r {
Ok(n) => {
stats.cq_bufs_received += n;
stats.cq_deficit -= n;
did_work = true;
}
Err(err) => panic!("error: {:?}", err),
}
}
let r = config
.link_rx
.try_recv(&mut pending_receive, SOCKET_BATCH_SIZE, bc);
match r {
Ok(n) => {
if n > 0 {
stats.rx_packets += n;
stats.fq_deficit += n;
did_work = true;
} else if config.link_fq.needs_wakeup() {
config.link_rx.wake();
}
}
Err(err) => {
panic!("error: {:?}", err);
}
}
config
.test
.lock()
.unwrap()
.receive(now, config.port, &mut bufs, &mut pending_receive);
config
.test
.lock()
.unwrap()
.transmit(now, config.port, &mut bufs, &mut pending_send);
let r = config
.link_tx
.try_send(&mut pending_send, SOCKET_BATCH_SIZE);
match r {
Ok(n) => {
if n > 0 {
stats.tx_packets += n;
stats.cq_deficit += n;
did_work = true;
}
}
Err(err) => println!("shouldn't happen: {:?}", err),
}
if stats.fq_deficit >= FILL_THRESHOLD {
let fill_num = min(FILL_BATCH_SIZE, stats.fq_deficit);
let r = config.link_fq.fill(&mut bufs, fill_num);
match r {
Ok(n) => {
stats.fq_bufs_filled += n;
stats.fq_deficit -= n;
did_work = true;
}
Err(err) => panic!("error: {:?}", err),
}
}
if !did_work {
hint::spin_loop();
}
}
}
pub fn run<T>(test: T)
where
T: 'static + TrafficTest + Send + Display,
{
let r = setrlimit(Resource::MEMLOCK, Rlim::INFINITY, Rlim::INFINITY);
match r {
Err(_) => {
writeln!(
&mut io::stdout(),
"Test skipped as it needs to be run as root"
)
.unwrap();
return;
}
Ok(_) => {
}
}
let test = Arc::new(Mutex::new(test));
let worker1_config = gen_worker_config("veth1", 0, 0, BUF_SIZE, BUF_NUM, test.clone());
let worker2_config = gen_worker_config("veth2", 0, 1, BUF_SIZE, BUF_NUM, test.clone());
let mut thread_handles = Vec::new();
let handle = thread::spawn(|| {
do_worker(worker1_config);
});
thread_handles.push(handle);
let handle = thread::spawn(|| {
do_worker(worker2_config);
});
thread_handles.push(handle);
for handle in thread_handles {
let r = handle.join();
match r {
Ok(_) => {}
Err(err) => println!("thread error: {:?}", err),
}
}
test.lock().unwrap().finalize();
}