1use nng::{Aio, AioResult, Context, Message, Protocol, Socket};
10use std::{
11 convert::TryInto,
12 env, process, thread,
13 time::{Duration, Instant},
14};
15
16const PARALLEL: usize = 128;
25
26fn main() -> Result<(), nng::Error> {
28 let args: Vec<_> = env::args().collect();
31
32 match &args[..] {
33 [_, t, url] if t == "server" => server(url),
34 [_, t, url, count] if t == "client" => client(url, count.parse().unwrap()),
35 _ => {
36 println!("Usage:\nasync server <url>\n or\nasync client <url> <ms>");
37 process::exit(1);
38 }
39 }
40}
41
42fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44 let s = Socket::new(Protocol::Req0)?;
45 s.dial(url)?;
46
47 let start = Instant::now();
48 s.send(ms.to_le_bytes())?;
49 s.recv()?;
50
51 let dur = Instant::now().duration_since(start);
52 let subsecs: u64 = dur.subsec_millis().into();
53 println!(
54 "Request took {} milliseconds",
55 dur.as_secs() * 1000 + subsecs
56 );
57
58 Ok(())
59}
60
61fn server(url: &str) -> Result<(), nng::Error> {
63 let s = Socket::new(Protocol::Rep0)?;
65
66 let workers: Vec<_> = (0..PARALLEL)
68 .map(|_| {
69 let ctx = Context::new(&s)?;
70 let ctx_clone = ctx.clone();
71 let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
72 Ok((aio, ctx))
73 })
74 .collect::<Result<_, nng::Error>>()?;
75
76 s.listen(url)?;
78
79 for (a, c) in &workers {
81 c.recv(a)?;
82 }
83
84 thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
85
86 Ok(())
87}
88
89fn worker_callback(aio: Aio, ctx: &Context, res: AioResult) {
91 match res {
92 AioResult::Send(Ok(_)) => ctx.recv(&aio).unwrap(),
94
95 AioResult::Recv(Ok(m)) => {
97 let ms = u64::from_le_bytes(m[..].try_into().unwrap());
98 aio.sleep(Duration::from_millis(ms)).unwrap();
99 }
100
101 AioResult::Sleep(Ok(_)) => {
103 ctx.send(&aio, Message::new()).unwrap();
104 }
105
106 AioResult::Send(Err((_, e))) | AioResult::Recv(Err(e)) | AioResult::Sleep(Err(e)) => {
108 panic!("Error: {}", e)
109 }
110 }
111}