extern crate nng;
extern crate byteorder;
use std::{env, thread, process, mem};
use std::time::{Duration, Instant};
use nng::{Socket, Protocol, Message};
use nng::aio::{Aio, Context};
use byteorder::{ByteOrder, LittleEndian};
const PARALLEL: usize = 128;
fn main() -> Result<(), nng::Error>
{
let args: Vec<_> = env::args().collect();
match &args[..] {
[_, t, url] if t == "server" => server(url),
[_, t, url, count] if t == "client" => client(url, count.parse().unwrap()),
_ => {
println!("Usage:\nasync server <url>\n or\nasync client <url> <ms>");
process::exit(1);
}
}
}
fn client(url: &str, ms: u64) -> Result<(), nng::Error>
{
let mut s = Socket::new(Protocol::Req0)?;
s.dial(url)?;
let mut req = Message::zeros(mem::size_of::<u64>())?;
LittleEndian::write_u64(&mut req, ms);
let start = Instant::now();
s.send(req)?;
s.recv()?;
let dur = Instant::now().duration_since(start);
println!("Request took {} milliseconds", dur.as_secs() * 1000 + dur.subsec_millis() as u64);
Ok(())
}
fn server(url: &str) -> Result<(), nng::Error>
{
let mut s = Socket::new(Protocol::Rep0)?;
let workers: Vec<_> =
(0..PARALLEL)
.map(|_| create_worker(&s))
.collect::<Result<_, _>>()?;
s.listen(url)?;
for (a, c) in &workers {
a.recv(c)?;
}
thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
Ok(())
}
fn create_worker(s: &Socket) -> Result<(Aio, Context), nng::Error>
{
let mut state = State::Recv;
let ctx = Context::new(s)?;
let ctx_clone = ctx.clone();
let aio = Aio::with_callback(move |aio| worker_callback(aio, &ctx_clone, &mut state))?;
Ok((aio, ctx))
}
fn worker_callback(aio: &Aio, ctx: &Context, state: &mut State)
{
let new_state = match *state {
State::Recv => {
let _ = aio.result().unwrap();
let msg = aio.get_msg().unwrap();
let ms = LittleEndian::read_u64(&msg);
aio.sleep(Duration::from_millis(ms)).unwrap();
State::Wait
},
State::Wait => {
let msg = Message::new().unwrap();
aio.send(ctx, msg).unwrap();
State::Send
},
State::Send => {
let _ = aio.result().unwrap();
aio.recv(ctx).unwrap();
State::Recv
}
};
*state = new_state;
}
#[derive(Copy, Clone)]
enum State { Recv, Wait, Send }