[][src]Struct nng::Aio

pub struct Aio { /* fields omitted */ }

An asynchronous I/O context.

Asynchronous operations are performed without blocking calling application threads. Instead the application registers a “callback” function to be executed when the operation is complete (whether successfully or not). This callback will be executed exactly once.

The callback must not perform any blocking operations and must complete it’s execution quickly. If the callback does block, this can lead ultimately to an apparent "hang" or deadlock in the application.

Example

A simple server that will sleep for the requested number of milliseconds before responding:

use std::time::Duration;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use nng::*;

const ADDRESS: &'static str = "inproc://nng/aio/example";
const WORKERS: usize = 10;

fn server() -> Result<()> {
    // Set up the server socket but don't listen for connections yet.
    let server = Socket::new(Protocol::Rep0)?;

    // Create all of the worker contexts. These do *not* represent the number
    // of threads that the REP socket will use.
    let workers: Vec<_> = (0..WORKERS)
        .map(|_| {
            let ctx = Context::new(&server)?;
            let ctx_clone = ctx.clone();

            // An actual program should have better error handling.
            let aio = Aio::new(move |aio, res| callback(&aio, &ctx_clone, res).unwrap())?;
            Ok((aio, ctx))
        })
        .collect::<Result<_>>()?;

    // Only after we have all of the workers do we start listening.
    server.listen(ADDRESS)?;

    // Now, start the workers.
    for (a, c) in &workers {
        c.recv(a)?;
    }

    // Now, do nothing and let the workers handle the jobs.
    std::thread::park();
    Ok(())
}

fn callback(aio: &Aio, ctx: &Context, res: AioResult) -> Result<()> {
    match res {
        // We successfully send the reply, wait for a new request.
        AioResult::Send(Ok(_)) => ctx.recv(aio),

        // We successfully received a message.
        AioResult::Recv(Ok(m)) => {
            let ms = m.as_slice().read_u64::<LittleEndian>().unwrap();
            aio.sleep(Duration::from_millis(ms))
        },

        // We successfully slept.
        AioResult::Sleep(Ok(_)) => {
            // We could have hung on to the request `Message` to avoid an
            let _ = ctx.send(aio, Message::new()?)?;
            Ok(())
        },

        // Anything else is an error and an actual program should handle it.
        _ => panic!("Error in the AIO"),
    }
}

fn client(ms: u64) -> Result<()> {
    // Set up the client socket and connect to the server.
    let client = Socket::new(Protocol::Req0)?;
    client.dial(ADDRESS)?;
    // Create the message containing the number of milliseconds to sleep.
    let mut req = Message::new()?;
    req.write_u64::<LittleEndian>(ms).unwrap();

    // Send the request to the server and wait for a response.
    client.send(req)?;

    // This should block for approximately `ms` milliseconds as we wait for the
    // server to sleep.
    client.recv()?;

    Ok(())
}

Methods

impl Aio[src]

pub fn new<F>(callback: F) -> Result<Self> where
    F: Fn(Aio, AioResult) + Sync + Send + 'static, 
[src]

Creates a new asynchronous I/O handle.

The provided callback will be called on every single I/O event, successful or not. It is possible that the callback will be entered multiple times simultaneously.

Panicking

If the callback function panics, the program will log the panic if possible and then abort. Future Rustc versions will likely do the same for uncaught panics at FFI boundaries, so this library will produce the abort in order to keep things consistent. As such, the user is responsible for either having a callback that never panics or catching and handling the panic within the callback.

pub fn set_timeout(&self, dur: Option<Duration>) -> Result<()>[src]

Set the timeout of asynchronous operations.

This causes a timer to be started when the operation is actually started. If the timer expires before the operation is completed, then it is aborted with Error::TimedOut.

As most operations involve some context switching, it is usually a good idea to allow a least a few tens of milliseconds before timing them out

  • a too small timeout might not allow the operation to properly begin before giving up!

It is only valid to try and set this when no operations are active.

pub fn sleep(&self, dur: Duration) -> Result<()>[src]

Performs and asynchronous sleep operation.

If the sleep finishes completely, it will never return an error. If a timeout has been set and it is shorter than the duration of the sleep operation, the sleep operation will end early with Error::TimedOut.

This function will return immediately. If there is already an I/O operation in progress, this function will return Error::TryAgain.

pub fn wait(&self)[src]

Blocks the current thread until the current asynchronous operation completes.

If there are no operations running then this function returns immediately. This function should not be called from within the completion callback.

pub fn cancel(&self)[src]

Cancel the currently running I/O operation.

Trait Implementations

impl Clone for Aio[src]

impl Eq for Aio[src]

impl PartialEq<Aio> for Aio[src]

impl Debug for Aio[src]

impl Hash for Aio[src]

Auto Trait Implementations

impl Send for Aio

impl Sync for Aio

impl Unpin for Aio

impl UnwindSafe for Aio

impl RefUnwindSafe for Aio

Blanket Implementations

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T> ToOwned for T where
    T: Clone
[src]

type Owned = T

The resulting type after obtaining ownership.

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = !

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> Any for T where
    T: 'static + ?Sized
[src]