pub struct AsyncContext<'a> { /* private fields */ }
Expand description
A wrapper type around Context
to enable async-await send and receive
operations.
This type allows for asynchronous context-based send and receive operations. One trouble with
AsyncSocket
and Socket
s in general is that managing concurrent sends and receives can be
troublesome depending on which scalability protocol your socket implements. For example,
consider the following program:
use async_nng::AsyncSocket;
struct MyMessageHandler {
// Assume a Rep0 socket
socket: AsyncSocket,
}
impl MyMessageHandler {
async fn some_api_call(&self, msg: nng::Message) -> Result<(), nng::Error> {
self.socket.send(msg, None).await;
// a really long operation
smol::Timer::after(Duration::from_millis(3000)).await;
let reply = self.socket.receive(None).await;
Ok(())
}
}
We have some message handling type, which may or may not break at one of the await points above. If it breaks ono the middle await point (our really long operation), then we might have an issue depending on the protocol being used. Above, we’re using req-rep, which has the restriction that generally (unless you’re using sockets in raw mode) that you cannot send two different requests before the reply is received.
Therefore, if one tries to run multiple message handlers or API calls from different tasks or threads concurrently they are at risk to run into an issue where the req-rep cycle invariants are violated because of long-awaiting tasks in between the send and receive.
This is exclusive because AsyncSocket::send
and AsyncSocket::receive
take &mut self
instead of &self
. The underlying nng
crate does not actually require this, but is done to
prevent multiple accesses of the same underlying AIO object in Rust. This still runs into an
issue. If users still want to concurrently construct API calls such as the above, they will
then choose to wrap the async socket inside of something like an Arc<Mutex<_>>
. If that mutex
is asynchronous and releases across await points, then you still have the issue that concurrent
calls to these APIs can fail because e.g. two sends were called before a receive was.
§Contexts
The way around this is to use contexts. Each context can be constructed locally on the stack, and then indepedently manage sending and receiving in a safe way, without requiring a lock on the socket as a whole:
use async_nng::AsyncContext;
use nng::Socket;
struct MyMessageHandler {
// Assume a Rep0 socket
socket: Socket,
}
impl MyMessageHandler {
async fn some_api_call(&self, msg: nng::Message) -> Result<(), nng::Error> {
// Notice that we use a regular borrow on `&self.socket`, not `&mut self.socket`.
//
// Contexts defined locally will not conflict with one another, which makes writing
// asynchronous, concurrent programs easier.
let context = AsyncContext::try_from(&self.socket)?;
context.send(msg, None).await;
// a really long operation
smol::Timer::after(Duration::from_millis(3000)).await;
let reply = context.receive(None).await;
Ok(())
}
}
Contexts cannot be used with raw-mode sockets; however, by borrowing the underlying socket they are able to create an object that can concurrently and independently operate without the aforementioned bugs that can arise at runtime due to trying to use e.g. a req-rep socket concurrently.
In almost all cases, one should prefer to use the AsyncContext
type when possible. It
provides better guarantees about concurrent access, which for asynchronous code is necessarily
a concern.
Implementations§
Source§impl<'a> AsyncContext<'a>
impl<'a> AsyncContext<'a>
Sourcepub async fn send<M>(
&mut self,
msg: M,
timeout: Option<Duration>,
) -> Result<(), (Message, Error)>
pub async fn send<M>( &mut self, msg: M, timeout: Option<Duration>, ) -> Result<(), (Message, Error)>
Sends a Message
to the socket asynchronously.
§Errors
IncorrectState
if the internalAio
is already running an operation, or the socket cannot send messages in its current state.MessageTooLarge
: The message is too large.NotSupported
: The protocol does not support sending messages.OutOfMemory
: Insufficient memory available.TimedOut
: The operation timed out.Closed
: The context or socket has been closed and future operations will not work.
Sourcepub async fn receive(
&mut self,
timeout: Option<Duration>,
) -> Result<Message, Error>
pub async fn receive( &mut self, timeout: Option<Duration>, ) -> Result<Message, Error>
Receives a Message
from the socket asynchronously.
§Errors
IncorrectState
if the internalAio
is already running an operation, or the socket cannot receive messages in its current state.MessageTooLarge
: The message is too large.NotSupported
: The protocol does not support sending messages.OutOfMemory
: Insufficient memory available.TimedOut
: The operation timed out.Closed
: The context or socket has been closed and future operations will not work.