pub struct Aio { /* private fields */ }
Expand description
An asynchronous I/O context.
Asynchronous operations are performed without blocking calling application threads. Instead the application registers a “callback” function to be executed when the operation is complete (whether successfully or not). This callback will be executed exactly once.
The callback must not perform any blocking operations and must complete it’s execution quickly. If the callback does block, this can lead ultimately to an apparent “hang” or deadlock in the application.
§Example
A simple server that will sleep for the requested number of milliseconds before responding:
use std::{convert::TryInto, time::Duration};
use nng::*;
const ADDRESS: &'static str = "inproc://nng/aio/example";
const WORKERS: usize = 10;
fn server() -> Result<()> {
// Set up the server socket but don't listen for connections yet.
let server = Socket::new(Protocol::Rep0)?;
// Create all of the worker contexts. These do *not* represent the number
// of threads that the REP socket will use.
let workers: Vec<_> = (0..WORKERS)
.map(|_| {
let ctx = Context::new(&server)?;
let ctx_clone = ctx.clone();
// An actual program should have better error handling.
let aio = Aio::new(move |aio, res| callback(&aio, &ctx_clone, res).unwrap())?;
Ok((aio, ctx))
})
.collect::<Result<_>>()?;
// Only after we have all of the workers do we start listening.
server.listen(ADDRESS)?;
// Now, start the workers.
for (a, c) in &workers {
c.recv(a)?;
}
// Now, do nothing and let the workers handle the jobs.
std::thread::park();
Ok(())
}
fn callback(aio: &Aio, ctx: &Context, res: AioResult) -> Result<()> {
match res {
// We successfully send the reply, wait for a new request.
AioResult::Send(Ok(_)) => ctx.recv(aio),
// We successfully received a message.
AioResult::Recv(Ok(m)) => {
let ms = u64::from_le_bytes(m[..].try_into().unwrap());
aio.sleep(Duration::from_millis(ms))
},
// We successfully slept.
AioResult::Sleep(Ok(_)) => {
// We could have hung on to the request `Message` to avoid an allocation
let _ = ctx.send(aio, Message::new())?;
Ok(())
},
// Anything else is an error and an actual program should handle it.
_ => panic!("Error in the AIO"),
}
}
fn client(ms: u64) -> Result<()> {
// Set up the client socket and connect to the server.
let client = Socket::new(Protocol::Req0)?;
client.dial(ADDRESS)?;
// Send the request to the server and wait for a response.
client.send(ms.to_le_bytes())?;
// This should block for approximately `ms` milliseconds as we wait for the
// server to sleep.
client.recv()?;
Ok(())
}
Implementations§
Source§impl Aio
impl Aio
Sourcepub fn new<F>(callback: F) -> Result<Self>
pub fn new<F>(callback: F) -> Result<Self>
Creates a new asynchronous I/O handle.
The provided callback will be called on every single I/O event, successful or not. It is possible that the callback will be entered multiple times simultaneously.
§Errors
OutOfMemory
: Insufficient memory available.
§Panics
If the callback function panics, the program will log the panic if possible and then abort. Future Rustc versions will likely do the same for uncaught panics at FFI boundaries, so this library will produce the abort in order to keep things consistent. As such, the user is responsible for either having a callback that never panics or catching and handling the panic within the callback.
Examples found in repository?
62fn server(url: &str) -> Result<(), nng::Error> {
63 // Create the socket
64 let s = Socket::new(Protocol::Rep0)?;
65
66 // Create all of the worker contexts
67 let workers: Vec<_> = (0..PARALLEL)
68 .map(|_| {
69 let ctx = Context::new(&s)?;
70 let ctx_clone = ctx.clone();
71 let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
72 Ok((aio, ctx))
73 })
74 .collect::<Result<_, nng::Error>>()?;
75
76 // Only after we have the workers do we start listening.
77 s.listen(url)?;
78
79 // Now start all of the workers listening.
80 for (a, c) in &workers {
81 c.recv(a)?;
82 }
83
84 thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
85
86 Ok(())
87}
Sourcepub fn set_timeout(&self, dur: Option<Duration>) -> Result<()>
pub fn set_timeout(&self, dur: Option<Duration>) -> Result<()>
Set the timeout of asynchronous operations.
This causes a timer to be started when the operation is actually
started. If the timer expires before the operation is completed, then it
is aborted with TimedOut
.
As most operations involve some context switching, it is usually a good idea to allow a least a few tens of milliseconds before timing them out as a too small timeout might not allow the operation to properly begin before giving up!
§Errors
IncorrectState
: TheAio
currently has a running operation.
Sourcepub fn sleep(&self, dur: Duration) -> Result<()>
pub fn sleep(&self, dur: Duration) -> Result<()>
Begins a sleep operation on the Aio
and returns immediately.
If the sleep finishes completely, it will never return an error. If a
timeout has been set and it is shorter than the duration of the sleep
operation, the sleep operation will end early with
TimedOut
.
§Errors
IncorrectState
: TheAio
already has a running operation.
Examples found in repository?
90fn worker_callback(aio: Aio, ctx: &Context, res: AioResult) {
91 match res {
92 // We successfully sent the message, wait for a new one.
93 AioResult::Send(Ok(_)) => ctx.recv(&aio).unwrap(),
94
95 // We successfully received a message.
96 AioResult::Recv(Ok(m)) => {
97 let ms = u64::from_le_bytes(m[..].try_into().unwrap());
98 aio.sleep(Duration::from_millis(ms)).unwrap();
99 }
100
101 // We successfully slept.
102 AioResult::Sleep(Ok(_)) => {
103 ctx.send(&aio, Message::new()).unwrap();
104 }
105
106 // Anything else is an error and we will just panic.
107 AioResult::Send(Err((_, e))) | AioResult::Recv(Err(e)) | AioResult::Sleep(Err(e)) => {
108 panic!("Error: {}", e)
109 }
110 }
111}