Struct Aio

Source
pub struct Aio { /* private fields */ }
Expand description

An asynchronous I/O context.

Asynchronous operations are performed without blocking calling application threads. Instead the application registers a “callback” function to be executed when the operation is complete (whether successfully or not). This callback will be executed exactly once.

The callback must not perform any blocking operations and must complete it’s execution quickly. If the callback does block, this can lead ultimately to an apparent “hang” or deadlock in the application.

§Example

A simple server that will sleep for the requested number of milliseconds before responding:

use std::{convert::TryInto, time::Duration};
use nng::*;

const ADDRESS: &'static str = "inproc://nng/aio/example";
const WORKERS: usize = 10;

fn server() -> Result<()> {
    // Set up the server socket but don't listen for connections yet.
    let server = Socket::new(Protocol::Rep0)?;

    // Create all of the worker contexts. These do *not* represent the number
    // of threads that the REP socket will use.
    let workers: Vec<_> = (0..WORKERS)
        .map(|_| {
            let ctx = Context::new(&server)?;
            let ctx_clone = ctx.clone();

            // An actual program should have better error handling.
            let aio = Aio::new(move |aio, res| callback(&aio, &ctx_clone, res).unwrap())?;
            Ok((aio, ctx))
        })
        .collect::<Result<_>>()?;

    // Only after we have all of the workers do we start listening.
    server.listen(ADDRESS)?;

    // Now, start the workers.
    for (a, c) in &workers {
        c.recv(a)?;
    }

    // Now, do nothing and let the workers handle the jobs.
    std::thread::park();
    Ok(())
}

fn callback(aio: &Aio, ctx: &Context, res: AioResult) -> Result<()> {
    match res {
        // We successfully send the reply, wait for a new request.
        AioResult::Send(Ok(_)) => ctx.recv(aio),

        // We successfully received a message.
        AioResult::Recv(Ok(m)) => {
            let ms = u64::from_le_bytes(m[..].try_into().unwrap());
            aio.sleep(Duration::from_millis(ms))
        },

        // We successfully slept.
        AioResult::Sleep(Ok(_)) => {
            // We could have hung on to the request `Message` to avoid an allocation
            let _ = ctx.send(aio, Message::new())?;
            Ok(())
        },

        // Anything else is an error and an actual program should handle it.
        _ => panic!("Error in the AIO"),
    }
}

fn client(ms: u64) -> Result<()> {
    // Set up the client socket and connect to the server.
    let client = Socket::new(Protocol::Req0)?;
    client.dial(ADDRESS)?;

    // Send the request to the server and wait for a response.
    client.send(ms.to_le_bytes())?;

    // This should block for approximately `ms` milliseconds as we wait for the
    // server to sleep.
    client.recv()?;

    Ok(())
}

Implementations§

Source§

impl Aio

Source

pub fn new<F>(callback: F) -> Result<Self>
where F: Fn(Aio, AioResult) + Sync + Send + 'static,

Creates a new asynchronous I/O handle.

The provided callback will be called on every single I/O event, successful or not. It is possible that the callback will be entered multiple times simultaneously.

§Errors
§Panics

If the callback function panics, the program will log the panic if possible and then abort. Future Rustc versions will likely do the same for uncaught panics at FFI boundaries, so this library will produce the abort in order to keep things consistent. As such, the user is responsible for either having a callback that never panics or catching and handling the panic within the callback.

Examples found in repository?
examples/async.rs (line 71)
62fn server(url: &str) -> Result<(), nng::Error> {
63    // Create the socket
64    let s = Socket::new(Protocol::Rep0)?;
65
66    // Create all of the worker contexts
67    let workers: Vec<_> = (0..PARALLEL)
68        .map(|_| {
69            let ctx = Context::new(&s)?;
70            let ctx_clone = ctx.clone();
71            let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
72            Ok((aio, ctx))
73        })
74        .collect::<Result<_, nng::Error>>()?;
75
76    // Only after we have the workers do we start listening.
77    s.listen(url)?;
78
79    // Now start all of the workers listening.
80    for (a, c) in &workers {
81        c.recv(a)?;
82    }
83
84    thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
85
86    Ok(())
87}
Source

pub fn set_timeout(&self, dur: Option<Duration>) -> Result<()>

Set the timeout of asynchronous operations.

This causes a timer to be started when the operation is actually started. If the timer expires before the operation is completed, then it is aborted with TimedOut.

As most operations involve some context switching, it is usually a good idea to allow a least a few tens of milliseconds before timing them out as a too small timeout might not allow the operation to properly begin before giving up!

§Errors
Source

pub fn sleep(&self, dur: Duration) -> Result<()>

Begins a sleep operation on the Aio and returns immediately.

If the sleep finishes completely, it will never return an error. If a timeout has been set and it is shorter than the duration of the sleep operation, the sleep operation will end early with TimedOut.

§Errors
Examples found in repository?
examples/async.rs (line 98)
90fn worker_callback(aio: Aio, ctx: &Context, res: AioResult) {
91    match res {
92        // We successfully sent the message, wait for a new one.
93        AioResult::Send(Ok(_)) => ctx.recv(&aio).unwrap(),
94
95        // We successfully received a message.
96        AioResult::Recv(Ok(m)) => {
97            let ms = u64::from_le_bytes(m[..].try_into().unwrap());
98            aio.sleep(Duration::from_millis(ms)).unwrap();
99        }
100
101        // We successfully slept.
102        AioResult::Sleep(Ok(_)) => {
103            ctx.send(&aio, Message::new()).unwrap();
104        }
105
106        // Anything else is an error and we will just panic.
107        AioResult::Send(Err((_, e))) | AioResult::Recv(Err(e)) | AioResult::Sleep(Err(e)) => {
108            panic!("Error: {}", e)
109        }
110    }
111}
Source

pub fn wait(&self)

Blocks the current thread until the current asynchronous operation completes.

If there are no operations running then this function returns immediately. This function should not be called from within the completion callback.

Source

pub fn cancel(&self)

Cancel the currently running I/O operation.

Trait Implementations§

Source§

impl Clone for Aio

Source§

fn clone(&self) -> Aio

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Aio

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Hash for Aio

Source§

fn hash<H: Hasher>(&self, state: &mut H)

Feeds this value into the given Hasher. Read more
1.3.0 · Source§

fn hash_slice<H>(data: &[Self], state: &mut H)
where H: Hasher, Self: Sized,

Feeds a slice of this type into the given Hasher. Read more
Source§

impl PartialEq for Aio

Source§

fn eq(&self, other: &Aio) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Source§

impl Eq for Aio

Auto Trait Implementations§

§

impl Freeze for Aio

§

impl RefUnwindSafe for Aio

§

impl Send for Aio

§

impl Sync for Aio

§

impl Unpin for Aio

§

impl UnwindSafe for Aio

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.