use nng::{Aio, AioResult, Context, Message, Protocol, Socket};
use std::{
convert::TryInto,
env, process, thread,
time::{Duration, Instant},
};
const PARALLEL: usize = 128;
fn main() -> Result<(), nng::Error> {
let args: Vec<_> = env::args().collect();
match &args[..] {
[_, t, url] if t == "server" => server(url),
[_, t, url, count] if t == "client" => client(url, count.parse().unwrap()),
_ => {
println!("Usage:\nasync server <url>\n or\nasync client <url> <ms>");
process::exit(1);
}
}
}
fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
let s = Socket::new(Protocol::Req0)?;
s.dial(url)?;
let start = Instant::now();
s.send(ms.to_le_bytes())?;
s.recv()?;
let dur = Instant::now().duration_since(start);
let subsecs: u64 = dur.subsec_millis().into();
println!(
"Request took {} milliseconds",
dur.as_secs() * 1000 + subsecs
);
Ok(())
}
fn server(url: &str) -> Result<(), nng::Error> {
let s = Socket::new(Protocol::Rep0)?;
let workers: Vec<_> = (0..PARALLEL)
.map(|_| {
let ctx = Context::new(&s)?;
let ctx_clone = ctx.clone();
let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
Ok((aio, ctx))
})
.collect::<Result<_, nng::Error>>()?;
s.listen(url)?;
for (a, c) in &workers {
c.recv(a)?;
}
thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
Ok(())
}
fn worker_callback(aio: Aio, ctx: &Context, res: AioResult) {
match res {
AioResult::Send(Ok(_)) => ctx.recv(&aio).unwrap(),
AioResult::Recv(Ok(m)) => {
let ms = u64::from_le_bytes(m[..].try_into().unwrap());
aio.sleep(Duration::from_millis(ms)).unwrap();
}
AioResult::Sleep(Ok(_)) => {
ctx.send(&aio, Message::new()).unwrap();
}
AioResult::Send(Err((_, e))) | AioResult::Recv(Err(e)) | AioResult::Sleep(Err(e)) => {
panic!("Error: {}", e)
}
}
}