async/
async.rs

1//! A simple asynchronous RPC server example.
2//!
3//! This example shows how to write an asynchronous RPC server using Contexts.
4//! This is heavily based on the `nng` demonstration program of the same name.
5//!
6//! The protocol is simple: the client sends a request with the number of
7//! milliseconds to wait, the server waits that long and sends back an empty
8//! reply.
9use nng::{Aio, AioResult, Context, Message, Protocol, Socket};
10use std::{
11    convert::TryInto,
12    env, process, thread,
13    time::{Duration, Instant},
14};
15
16/// Number of outstanding requests that we can handle at a given time.
17///
18/// This is *NOT* the number of threads in use, but instead represents
19/// outstanding work items. Select a small number to reduce memory size. (Each
20/// one of these can be thought of as a request-reply loop.) Note that you will
21/// probably run into limitations on the number of open file descriptors if you
22/// set this too high. (If not for that limit, this could be set in the
23/// thousands, each context consumes a couple of KB.)
24const PARALLEL: usize = 128;
25
26/// Entry point of the application.
27fn main() -> Result<(), nng::Error> {
28    // Begin by parsing the arguments. We are either a server or a client, and
29    // we need an address and potentially a sleep duration.
30    let args: Vec<_> = env::args().collect();
31
32    match &args[..] {
33        [_, t, url] if t == "server" => server(url),
34        [_, t, url, count] if t == "client" => client(url, count.parse().unwrap()),
35        _ => {
36            println!("Usage:\nasync server <url>\n  or\nasync client <url> <ms>");
37            process::exit(1);
38        }
39    }
40}
41
42/// Run the client portion of the program.
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44    let s = Socket::new(Protocol::Req0)?;
45    s.dial(url)?;
46
47    let start = Instant::now();
48    s.send(ms.to_le_bytes())?;
49    s.recv()?;
50
51    let dur = Instant::now().duration_since(start);
52    let subsecs: u64 = dur.subsec_millis().into();
53    println!(
54        "Request took {} milliseconds",
55        dur.as_secs() * 1000 + subsecs
56    );
57
58    Ok(())
59}
60
61/// Run the server portion of the program.
62fn server(url: &str) -> Result<(), nng::Error> {
63    // Create the socket
64    let s = Socket::new(Protocol::Rep0)?;
65
66    // Create all of the worker contexts
67    let workers: Vec<_> = (0..PARALLEL)
68        .map(|_| {
69            let ctx = Context::new(&s)?;
70            let ctx_clone = ctx.clone();
71            let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
72            Ok((aio, ctx))
73        })
74        .collect::<Result<_, nng::Error>>()?;
75
76    // Only after we have the workers do we start listening.
77    s.listen(url)?;
78
79    // Now start all of the workers listening.
80    for (a, c) in &workers {
81        c.recv(a)?;
82    }
83
84    thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
85
86    Ok(())
87}
88
89/// Callback function for workers.
90fn worker_callback(aio: Aio, ctx: &Context, res: AioResult) {
91    match res {
92        // We successfully sent the message, wait for a new one.
93        AioResult::Send(Ok(_)) => ctx.recv(&aio).unwrap(),
94
95        // We successfully received a message.
96        AioResult::Recv(Ok(m)) => {
97            let ms = u64::from_le_bytes(m[..].try_into().unwrap());
98            aio.sleep(Duration::from_millis(ms)).unwrap();
99        }
100
101        // We successfully slept.
102        AioResult::Sleep(Ok(_)) => {
103            ctx.send(&aio, Message::new()).unwrap();
104        }
105
106        // Anything else is an error and we will just panic.
107        AioResult::Send(Err((_, e))) | AioResult::Recv(Err(e)) | AioResult::Sleep(Err(e)) => {
108            panic!("Error: {}", e)
109        }
110    }
111}