Struct nng::Aio[][src]

pub struct Aio { /* fields omitted */ }
Expand description

An asynchronous I/O context.

Asynchronous operations are performed without blocking calling application threads. Instead the application registers a “callback” function to be executed when the operation is complete (whether successfully or not). This callback will be executed exactly once.

The callback must not perform any blocking operations and must complete it’s execution quickly. If the callback does block, this can lead ultimately to an apparent “hang” or deadlock in the application.

Example

A simple server that will sleep for the requested number of milliseconds before responding:

use std::{convert::TryInto, time::Duration};
use nng::*;

const ADDRESS: &'static str = "inproc://nng/aio/example";
const WORKERS: usize = 10;

fn server() -> Result<()> {
    // Set up the server socket but don't listen for connections yet.
    let server = Socket::new(Protocol::Rep0)?;

    // Create all of the worker contexts. These do *not* represent the number
    // of threads that the REP socket will use.
    let workers: Vec<_> = (0..WORKERS)
        .map(|_| {
            let ctx = Context::new(&server)?;
            let ctx_clone = ctx.clone();

            // An actual program should have better error handling.
            let aio = Aio::new(move |aio, res| callback(&aio, &ctx_clone, res).unwrap())?;
            Ok((aio, ctx))
        })
        .collect::<Result<_>>()?;

    // Only after we have all of the workers do we start listening.
    server.listen(ADDRESS)?;

    // Now, start the workers.
    for (a, c) in &workers {
        c.recv(a)?;
    }

    // Now, do nothing and let the workers handle the jobs.
    std::thread::park();
    Ok(())
}

fn callback(aio: &Aio, ctx: &Context, res: AioResult) -> Result<()> {
    match res {
        // We successfully send the reply, wait for a new request.
        AioResult::Send(Ok(_)) => ctx.recv(aio),

        // We successfully received a message.
        AioResult::Recv(Ok(m)) => {
            let ms = u64::from_le_bytes(m[..].try_into().unwrap());
            aio.sleep(Duration::from_millis(ms))
        },

        // We successfully slept.
        AioResult::Sleep(Ok(_)) => {
            // We could have hung on to the request `Message` to avoid an allocation
            let _ = ctx.send(aio, Message::new())?;
            Ok(())
        },

        // Anything else is an error and an actual program should handle it.
        _ => panic!("Error in the AIO"),
    }
}

fn client(ms: u64) -> Result<()> {
    // Set up the client socket and connect to the server.
    let client = Socket::new(Protocol::Req0)?;
    client.dial(ADDRESS)?;

    // Send the request to the server and wait for a response.
    client.send(ms.to_le_bytes())?;

    // This should block for approximately `ms` milliseconds as we wait for the
    // server to sleep.
    client.recv()?;

    Ok(())
}

Implementations

Creates a new asynchronous I/O handle.

The provided callback will be called on every single I/O event, successful or not. It is possible that the callback will be entered multiple times simultaneously.

Errors
Panics

If the callback function panics, the program will log the panic if possible and then abort. Future Rustc versions will likely do the same for uncaught panics at FFI boundaries, so this library will produce the abort in order to keep things consistent. As such, the user is responsible for either having a callback that never panics or catching and handling the panic within the callback.

Set the timeout of asynchronous operations.

This causes a timer to be started when the operation is actually started. If the timer expires before the operation is completed, then it is aborted with TimedOut.

As most operations involve some context switching, it is usually a good idea to allow a least a few tens of milliseconds before timing them out as a too small timeout might not allow the operation to properly begin before giving up!

Errors

Begins a sleep operation on the Aio and returns immediately.

If the sleep finishes completely, it will never return an error. If a timeout has been set and it is shorter than the duration of the sleep operation, the sleep operation will end early with TimedOut.

Errors

Blocks the current thread until the current asynchronous operation completes.

If there are no operations running then this function returns immediately. This function should not be called from within the completion callback.

Cancel the currently running I/O operation.

Trait Implementations

Returns a copy of the value. Read more

Performs copy-assignment from source. Read more

Formats the value using the given formatter. Read more

Feeds this value into the given Hasher. Read more

Feeds a slice of this type into the given Hasher. Read more

This method tests for self and other values to be equal, and is used by ==. Read more

This method tests for !=.

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Performs the conversion.

Performs the conversion.

The resulting type after obtaining ownership.

Creates owned data from borrowed data, usually by cloning. Read more

🔬 This is a nightly-only experimental API. (toowned_clone_into)

Uses borrowed data to replace owned data, usually by cloning. Read more

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.