Struct AsyncContext

Source
pub struct AsyncContext<'a> { /* private fields */ }
Expand description

A wrapper type around Context to enable async-await send and receive operations.

This type allows for asynchronous context-based send and receive operations. One trouble with AsyncSocket and Sockets in general is that managing concurrent sends and receives can be troublesome depending on which scalability protocol your socket implements. For example, consider the following program:

use async_nng::AsyncSocket;

struct MyMessageHandler {
    // Assume a Rep0 socket
    socket: AsyncSocket,
}

impl MyMessageHandler {
    async fn some_api_call(&self, msg: nng::Message) -> Result<(), nng::Error> {
        self.socket.send(msg, None).await;

        // a really long operation
        smol::Timer::after(Duration::from_millis(3000)).await;

        let reply = self.socket.receive(None).await;
        Ok(())
    }
}

We have some message handling type, which may or may not break at one of the await points above. If it breaks ono the middle await point (our really long operation), then we might have an issue depending on the protocol being used. Above, we’re using req-rep, which has the restriction that generally (unless you’re using sockets in raw mode) that you cannot send two different requests before the reply is received.

Therefore, if one tries to run multiple message handlers or API calls from different tasks or threads concurrently they are at risk to run into an issue where the req-rep cycle invariants are violated because of long-awaiting tasks in between the send and receive.

This is exclusive because AsyncSocket::send and AsyncSocket::receive take &mut self instead of &self. The underlying nng crate does not actually require this, but is done to prevent multiple accesses of the same underlying AIO object in Rust. This still runs into an issue. If users still want to concurrently construct API calls such as the above, they will then choose to wrap the async socket inside of something like an Arc<Mutex<_>>. If that mutex is asynchronous and releases across await points, then you still have the issue that concurrent calls to these APIs can fail because e.g. two sends were called before a receive was.

§Contexts

The way around this is to use contexts. Each context can be constructed locally on the stack, and then indepedently manage sending and receiving in a safe way, without requiring a lock on the socket as a whole:

use async_nng::AsyncContext;
use nng::Socket;

struct MyMessageHandler {
    // Assume a Rep0 socket
    socket: Socket,
}

impl MyMessageHandler {
    async fn some_api_call(&self, msg: nng::Message) -> Result<(), nng::Error> {
        // Notice that we use a regular borrow on `&self.socket`, not `&mut self.socket`.
        //
        // Contexts defined locally will not conflict with one another, which makes writing
        // asynchronous, concurrent programs easier.
        let context = AsyncContext::try_from(&self.socket)?;

        context.send(msg, None).await;

        // a really long operation
        smol::Timer::after(Duration::from_millis(3000)).await;

        let reply = context.receive(None).await;
        Ok(())
    }
}

Contexts cannot be used with raw-mode sockets; however, by borrowing the underlying socket they are able to create an object that can concurrently and independently operate without the aforementioned bugs that can arise at runtime due to trying to use e.g. a req-rep socket concurrently.

In almost all cases, one should prefer to use the AsyncContext type when possible. It provides better guarantees about concurrent access, which for asynchronous code is necessarily a concern.

Implementations§

Source§

impl<'a> AsyncContext<'a>

Source

pub async fn send<M>( &mut self, msg: M, timeout: Option<Duration>, ) -> Result<(), (Message, Error)>
where M: Into<Message>,

Sends a Message to the socket asynchronously.

§Errors
  • IncorrectState if the internal Aio is already running an operation, or the socket cannot send messages in its current state.
  • MessageTooLarge: The message is too large.
  • NotSupported: The protocol does not support sending messages.
  • OutOfMemory: Insufficient memory available.
  • TimedOut: The operation timed out.
  • Closed: The context or socket has been closed and future operations will not work.
Source

pub async fn receive( &mut self, timeout: Option<Duration>, ) -> Result<Message, Error>

Receives a Message from the socket asynchronously.

§Errors
  • IncorrectState if the internal Aio is already running an operation, or the socket cannot receive messages in its current state.
  • MessageTooLarge: The message is too large.
  • NotSupported: The protocol does not support sending messages.
  • OutOfMemory: Insufficient memory available.
  • TimedOut: The operation timed out.
  • Closed: The context or socket has been closed and future operations will not work.

Trait Implementations§

Source§

impl<'a> Debug for AsyncContext<'a>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<'a> TryFrom<&'a Socket> for AsyncContext<'a>

Source§

type Error = Error

The type returned in the event of a conversion error.
Source§

fn try_from(socket: &'a Socket) -> Result<Self, Self::Error>

Performs the conversion.

Auto Trait Implementations§

§

impl<'a> Freeze for AsyncContext<'a>

§

impl<'a> RefUnwindSafe for AsyncContext<'a>

§

impl<'a> Send for AsyncContext<'a>

§

impl<'a> Sync for AsyncContext<'a>

§

impl<'a> !Unpin for AsyncContext<'a>

§

impl<'a> UnwindSafe for AsyncContext<'a>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more