#![deny(clippy::all)]
#![feature(extract_if)]
#![feature(hash_extract_if)]
use anyhow::Result;
use clap::{
Arg,
ArgMatches,
Command,
};
use client::TcpEchoClient;
use demikernel::{
LibOS,
LibOSName,
};
use server::TcpEchoServer;
use std::{
net::SocketAddr,
str::FromStr,
thread,
thread::JoinHandle,
time::Duration,
};
#[cfg(target_os = "windows")]
pub const AF_INET: i32 = windows::Win32::Networking::WinSock::AF_INET.0 as i32;
#[cfg(target_os = "windows")]
pub const SOCK_STREAM: i32 = windows::Win32::Networking::WinSock::SOCK_STREAM.0 as i32;
#[cfg(target_os = "linux")]
pub const AF_INET: i32 = libc::AF_INET;
#[cfg(target_os = "linux")]
pub const SOCK_STREAM: i32 = libc::SOCK_STREAM;
mod client;
mod server;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug)]
pub struct ProgramArguments {
run_mode: Option<String>,
addr: SocketAddr,
bufsize: Option<usize>,
nrequests: Option<usize>,
nclients: Option<usize>,
nthreads: Option<usize>,
log_interval: Option<u64>,
peer_type: String,
}
impl ProgramArguments {
pub fn new(app_name: &'static str, app_author: &'static str, app_about: &'static str) -> Result<Self> {
let matches: ArgMatches = Command::new(app_name)
.author(app_author)
.about(app_about)
.arg(
Arg::new("addr")
.long("address")
.value_parser(clap::value_parser!(String))
.required(true)
.value_name("ADDRESS:PORT")
.help("Sets socket address"),
)
.arg(
Arg::new("peer")
.long("peer")
.value_parser(clap::value_parser!(String))
.required(true)
.value_name("server|client")
.default_value("server")
.help("Sets peer type"),
)
.arg(
Arg::new("bufsize")
.long("bufsize")
.value_parser(clap::value_parser!(usize))
.required(false)
.value_name("SIZE")
.help("Sets buffer size"),
)
.arg(
Arg::new("nclients")
.long("nclients")
.value_parser(clap::value_parser!(usize))
.required(false)
.value_name("NUMBER")
.help("Sets number of clients (per thread)"),
)
.arg(
Arg::new("nthreads")
.long("nthreads")
.value_parser(clap::value_parser!(usize))
.required(false)
.value_name("NUMBER")
.help("Sets number of threads"),
)
.arg(
Arg::new("nrequests")
.long("nrequests")
.value_parser(clap::value_parser!(usize))
.required(false)
.value_name("NUMBER")
.help("Sets number of requests"),
)
.arg(
Arg::new("log")
.long("log")
.value_parser(clap::value_parser!(u64))
.required(false)
.value_name("INTERVAL")
.help("Enables logging"),
)
.arg(
Arg::new("run-mode")
.long("run-mode")
.value_parser(clap::value_parser!(String))
.required(false)
.value_name("sequential|concurrent")
.help("Sets run mode"),
)
.get_matches();
let addr: SocketAddr = {
let addr: &String = matches.get_one::<String>("addr").expect("missing address");
SocketAddr::from_str(addr)?
};
let mut args: ProgramArguments = ProgramArguments {
run_mode: None,
addr,
bufsize: None,
nrequests: None,
nclients: None,
nthreads: None,
log_interval: None,
peer_type: "server".to_string(),
};
if let Some(run_mode) = matches.get_one::<String>("run-mode") {
args.run_mode = Some(run_mode.to_string());
}
if let Some(bufsize) = matches.get_one::<usize>("bufsize") {
if *bufsize > 0 {
args.bufsize = Some(*bufsize);
}
}
if let Some(nrequests) = matches.get_one::<usize>("nrequests") {
if *nrequests > 0 {
args.nrequests = Some(*nrequests);
}
}
if let Some(nclients) = matches.get_one::<usize>("nclients") {
if *nclients > 0 {
args.nclients = Some(*nclients);
}
}
if let Some(nthreads) = matches.get_one::<usize>("nthreads") {
if *nthreads > 0 {
args.nthreads = Some(*nthreads);
}
}
if let Some(log_interval) = matches.get_one::<u64>("log") {
if *log_interval > 0 {
args.log_interval = Some(*log_interval);
}
}
if let Some(peer_type) = matches.get_one::<String>("peer") {
let ref mut this = args;
let peer_type = peer_type.to_string();
if peer_type != "server" && peer_type != "client" {
anyhow::bail!("invalid peer type");
} else {
this.peer_type = peer_type;
}
}
Ok(args)
}
}
fn start_server_thread(
libos_name: LibOSName,
addr: SocketAddr,
log_interval: Option<u64>,
) -> Result<JoinHandle<Result<()>>> {
Ok(thread::spawn(move || -> Result<()> {
let libos: LibOS = match LibOS::new(libos_name, None) {
Ok(libos) => libos,
Err(e) => anyhow::bail!("failed to initialize libos: {:?}", e.cause),
};
let mut server: TcpEchoServer = TcpEchoServer::new(libos, addr)?;
server.run(log_interval)
}))
}
fn start_client_thread(
libos_name: LibOSName,
nclients: usize,
nrequests: Option<usize>,
bufsize: usize,
run_mode: &String,
addr: SocketAddr,
log_interval: Option<u64>,
) -> Result<JoinHandle<Result<()>>> {
match run_mode.as_str() {
"sequential" => Ok(thread::spawn(move || -> Result<()> {
let libos: LibOS = match LibOS::new(libos_name, None) {
Ok(libos) => libos,
Err(e) => anyhow::bail!("failed to initialize libos: {:?}", e.cause),
};
let mut client: TcpEchoClient = TcpEchoClient::new(libos, bufsize, addr)?;
client.run_sequential(log_interval, nclients, nrequests)
})),
"concurrent" => Ok(thread::spawn(move || -> Result<()> {
let libos: LibOS = match LibOS::new(libos_name, None) {
Ok(libos) => libos,
Err(e) => anyhow::bail!("failed to initialize libos: {:?}", e.cause),
};
let mut client: TcpEchoClient = TcpEchoClient::new(libos, bufsize, addr)?;
client.run_concurrent(log_interval, nclients, nrequests)
})),
_ => anyhow::bail!("invalid run mode"),
}
}
fn main() -> Result<()> {
let args: ProgramArguments = ProgramArguments::new(
"tcp-echo",
"Pedro Henrique Penna <ppenna@microsoft.com>",
"Echoes TCP packets.",
)?;
let libos_name: LibOSName = match LibOSName::from_env() {
Ok(libos_name) => libos_name.into(),
Err(e) => anyhow::bail!("{:?}", e),
};
let mut threads = vec![];
match args.peer_type.as_str() {
"server" => {
let nthreads: usize = match libos_name {
LibOSName::Catnap => args.nthreads.unwrap_or(1),
_ => 1,
};
for _ in 0..nthreads {
if let Ok(handle) = start_server_thread(libos_name, args.addr, args.log_interval) {
threads.push(handle)
}
}
},
"client" => {
let run_mode: String = args.run_mode.ok_or(anyhow::anyhow!("missing run mode"))?;
for _ in 0..args.nthreads.unwrap_or(1) {
if let Ok(handle) = start_client_thread(
libos_name,
args.nclients.ok_or(anyhow::anyhow!("missing number of clients"))?,
args.nrequests,
args.bufsize.ok_or(anyhow::anyhow!("missing buffer size"))?,
&run_mode,
args.addr,
args.log_interval,
) {
threads.push(handle);
}
}
},
_ => todo!(),
}
for handle in threads {
handle.join().unwrap()?;
}
Ok(())
}