#![deny(clippy::all)]
use ::anyhow::Result;
use ::clap::{
Arg,
ArgMatches,
Command,
};
use ::demikernel::{
demi_sgarray_t,
runtime::types::demi_opcode_t,
LibOS,
LibOSName,
QDesc,
QToken,
};
use ::std::{
net::SocketAddr,
slice,
str::FromStr,
time::{
Duration,
Instant,
},
};
#[cfg(target_os = "windows")]
pub const AF_INET: i32 = windows::Win32::Networking::WinSock::AF_INET.0 as i32;
#[cfg(target_os = "windows")]
pub const SOCK_STREAM: i32 = windows::Win32::Networking::WinSock::SOCK_STREAM.0 as i32;
#[cfg(target_os = "linux")]
pub const AF_INET: i32 = libc::AF_INET;
#[cfg(target_os = "linux")]
pub const SOCK_STREAM: i32 = libc::SOCK_STREAM;
#[derive(Debug)]
pub struct ProgramArguments {
remote_socket_addr: SocketAddr,
bufsize_bytes: usize,
injection_rate_microseconds: u64,
}
impl ProgramArguments {
const DEFAULT_BUFSIZE_BYTES: usize = 1024;
const DEFAULT_INJECTION_RATE_MICROSECONDS: u64 = 100;
const DEFAULT_REMOTE_ADDR: &'static str = "127.0.0.1:23456";
pub fn new() -> Result<Self> {
let matches: ArgMatches = Command::new("tcp-pktgen")
.arg(
Arg::new("remote")
.long("remote")
.value_parser(clap::value_parser!(String))
.required(true)
.value_name("ADDRESS:PORT")
.help("Sets remote address"),
)
.arg(
Arg::new("bufsize")
.long("bufsize")
.value_parser(clap::value_parser!(String))
.required(true)
.value_name("SIZE")
.help("Sets buffer size"),
)
.arg(
Arg::new("injection_rate")
.long("injection_rate")
.value_parser(clap::value_parser!(String))
.required(true)
.value_name("RATE")
.help("Sets packet injection rate"),
)
.get_matches();
let mut args: ProgramArguments = ProgramArguments {
remote_socket_addr: SocketAddr::from_str(Self::DEFAULT_REMOTE_ADDR)?,
bufsize_bytes: Self::DEFAULT_BUFSIZE_BYTES,
injection_rate_microseconds: Self::DEFAULT_INJECTION_RATE_MICROSECONDS,
};
if let Some(addr) = matches.get_one::<String>("remote") {
args.set_remote_socket_addr(addr)?;
}
if let Some(bufsize_bytes) = matches.get_one::<String>("bufsize") {
args.set_bufsize(bufsize_bytes)?;
}
if let Some(injection_rate_microseconds) = matches.get_one::<String>("injection_rate") {
args.set_injection_rate(injection_rate_microseconds)?;
}
Ok(args)
}
pub fn get_remote_socket_addr(&self) -> SocketAddr {
self.remote_socket_addr
}
pub fn get_bufsize(&self) -> usize {
self.bufsize_bytes
}
pub fn get_injection_rate(&self) -> u64 {
self.injection_rate_microseconds
}
fn set_remote_socket_addr(&mut self, addr: &str) -> Result<()> {
self.remote_socket_addr = SocketAddr::from_str(addr)?;
Ok(())
}
fn set_bufsize(&mut self, bufsize_bytes: &str) -> Result<()> {
let bufsize_bytes: usize = bufsize_bytes.parse()?;
if bufsize_bytes > 0 {
self.bufsize_bytes = bufsize_bytes;
Ok(())
} else {
anyhow::bail!("invalid buffer size")
}
}
fn set_injection_rate(&mut self, injection_rate_microseconds: &str) -> Result<()> {
let injection_rate_microseconds: u64 = injection_rate_microseconds.parse()?;
if injection_rate_microseconds > 0 {
self.injection_rate_microseconds = injection_rate_microseconds;
Ok(())
} else {
anyhow::bail!("invalid injection rate")
}
}
}
struct Application {
libos: LibOS,
sockqd: QDesc,
bufsize_bytes: usize,
injection_rate_microseconds: u64,
}
impl Application {
const LOG_INTERVAL_SECONDS: u64 = 5;
pub fn new(mut libos: LibOS, args: &ProgramArguments) -> Result<Self> {
let remote_socket_addr: SocketAddr = args.get_remote_socket_addr();
let bufsize_bytes: usize = args.get_bufsize();
let injection_rate_microseconds: u64 = args.get_injection_rate();
let sockqd: QDesc = match libos.socket(AF_INET, SOCK_STREAM, 0) {
Ok(sockqd) => sockqd,
Err(e) => {
anyhow::bail!("failed to create socket: {:?}", e)
},
};
let qt: QToken = match libos.connect(sockqd, remote_socket_addr) {
Ok(qt) => qt,
Err(e) => {
if let Err(e) = libos.close(sockqd) {
println!("ERROR: close() failed (error={:?}", e);
println!("WARN: leaking sockqd={:?}", sockqd);
}
anyhow::bail!("failed to connect socket: {:?}", e)
},
};
match libos.wait(qt, None) {
Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_CONNECT => println!("connected!"),
Ok(_) => {
if let Err(e) = libos.close(sockqd) {
println!("ERROR: close() failed (error={:?}", e);
println!("WARN: leaking sockqd={:?}", sockqd);
}
anyhow::bail!("unexpected result")
},
Err(e) => {
if let Err(e) = libos.close(sockqd) {
println!("ERROR: close() failed (error={:?}", e);
println!("WARN: leaking sockqd={:?}", sockqd);
}
anyhow::bail!("operation failed: {:?}", e)
},
}
println!("Remote Address: {:?}", remote_socket_addr);
Ok(Self {
libos,
sockqd,
bufsize_bytes,
injection_rate_microseconds,
})
}
pub fn run(&mut self) -> Result<()> {
let mut num_bytes: usize = 0;
let start_time: Instant = Instant::now();
let mut last_push_time: Instant = Instant::now();
let mut last_log_time: Instant = Instant::now();
loop {
if last_log_time.elapsed() > Duration::from_secs(Self::LOG_INTERVAL_SECONDS) {
let elapsed_time: Duration = Instant::now() - start_time;
println!("{:?} B / {:?} us", num_bytes, elapsed_time.as_micros());
last_log_time = Instant::now();
}
if last_push_time.elapsed() > Duration::from_micros(self.injection_rate_microseconds) {
let sga: demi_sgarray_t = self.mksga(self.bufsize_bytes, 0x65)?;
let qt: QToken = match self.libos.push(self.sockqd, &sga) {
Ok(qt) => qt,
Err(e) => {
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
}
anyhow::bail!("failed to push data to socket: {:?}", e)
},
};
match self.libos.wait(qt, None) {
Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_PUSH => (),
Ok(_) => {
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
}
anyhow::bail!("unexpected result")
},
Err(e) => {
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
}
anyhow::bail!("operation failed: {:?}", e)
},
};
num_bytes += sga.sga_segs[0].sgaseg_len as usize;
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
}
last_push_time = Instant::now();
}
}
}
fn mksga(&mut self, size: usize, value: u8) -> Result<demi_sgarray_t> {
let sga: demi_sgarray_t = match self.libos.sgaalloc(size) {
Ok(sga) => sga,
Err(e) => anyhow::bail!("failed to allocate scatter-gather array: {:?}", e),
};
if sga.sga_segs[0].sgaseg_len as usize != size {
if let Err(e) = self.libos.sgafree(sga) {
println!("ERROR: sgafree() failed (error={:?})", e);
println!("WARN: leaking sga");
}
let seglen: usize = sga.sga_segs[0].sgaseg_len as usize;
anyhow::bail!(
"failed to allocate scatter-gather array: expected size={:?} allocated size={:?}",
size,
seglen
);
}
let ptr: *mut u8 = sga.sga_segs[0].sgaseg_buf as *mut u8;
let len: usize = sga.sga_segs[0].sgaseg_len as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
slice.fill(value);
Ok(sga)
}
}
impl Drop for Application {
fn drop(&mut self) {
if let Err(e) = self.libos.close(self.sockqd) {
println!("ERROR: close() failed (error={:?}", e);
println!("WARN: leaking sockqd={:?}", self.sockqd);
}
}
}
fn main() -> Result<()> {
let args: ProgramArguments = ProgramArguments::new()?;
let libos_name: LibOSName = match LibOSName::from_env() {
Ok(libos_name) => libos_name.into(),
Err(e) => anyhow::bail!("{:?}", e),
};
let libos: LibOS = match LibOS::new(libos_name, None) {
Ok(libos) => libos,
Err(e) => anyhow::bail!("failed to initialize libos: {:?}", e),
};
Application::new(libos, &args)?.run()
}