pub struct ContextfulSocket<'socket, Protocol> { /* private fields */ }Expand description
An async-ready socket with an associated context for concurrent operations.
ContextfulSocket wraps a Socket with an NNG context, enabling safe concurrent
operations on the same underlying socket. Each context maintains independent protocol
state, which is crucial for protocols like Request/Reply where the state machine
must be maintained per operation.
§Why contexts are necessary
NNG sockets themselves are not safe for concurrent operations. Attempting concurrent operations directly on a socket can lead to:
- Protocol state violations (e.g., receiving before sending in REQ/REP)
- Message corruption or loss
- Deadlocks or race conditions
Contexts solve this by providing independent protocol state machines that share the same underlying transport and configuration.
§Cancellation safety
All async operations on ContextfulSocket are cancellation safe. You can safely
use this type with tokio::select! and other cancellation patterns:
- Receive operations: If cancelled after a message arrives, that message will generally be
returned by the next receive call on the same context, though the exact semantics depend on
the protocol used. Prefer pinning receives outside of
select!loops to avoid potential starvation. - Send operations: If cancelled, the message may or may not be sent (depending on timing). The message will be dropped if not sent.
- Resource cleanup: All resources are properly cleaned up on cancellation with no leaks or undefined behavior.
The library handles all NNG cleanup requirements internally, including calling
nng_aio_cancel and waiting for operations to fully terminate.
Implementations§
Source§impl<'socket> ContextfulSocket<'socket, Req0>
impl<'socket> ContextfulSocket<'socket, Req0>
Sourcepub async fn request<'s>(
&'s mut self,
message: Message,
) -> Result<impl Future<Output = Result<Message, AioError>> + use<'s, 'socket>, (AioError, Message)>
pub async fn request<'s>( &'s mut self, message: Message, ) -> Result<impl Future<Output = Result<Message, AioError>> + use<'s, 'socket>, (AioError, Message)>
Sends a request and returns a future that will resolve to the reply.
This method implements the client side of the Request/Reply pattern. It sends the provided message as a request and returns a future that will resolve to the reply message when it arrives.
§Two-Phase Operation
The request operation is split into two phases:
- Send phase: Returns immediately with a future if the send succeeds
- Reply phase: The returned future awaits the reply
This design allows you to handle send failures immediately while still benefiting from async reply handling.
§Cancellation safety
This function is cancellation safe. If the send phase is cancelled, the message may or may not be sent (depending on when cancellation occurs relative to NNG’s internal dispatch). The message will be dropped if not sent. If the reply future is cancelled, any incoming reply will be discarded, and a new request can be sent immediately.
§Protocol state
Each context maintains independent protocol state. You can have multiple outstanding requests across different contexts, but each individual context must complete its request-reply cycle before starting a new one. This is enforced at compile-time.
§Errors
Returns Err((error, message)) if the send operation fails, giving you back
the message for potential retry. The reply future can fail with
network errors, timeouts, or cancellation.
§Examples
use anng::{protocols::reqrep0, Message, AioError};
use std::io::Write;
let socket = reqrep0::Req0::dial(c"inproc://request-doctest").await?;
let mut ctx = socket.context();
let mut request = Message::with_capacity(100);
write!(&mut request, "Hello")?;
match ctx.request(request).await {
Ok(reply_future) => {
match reply_future.await {
Ok(reply) => println!("Got reply: {:?}", reply.as_slice()),
Err(AioError::TimedOut) => println!("Request timed out"),
Err(e) => println!("Request failed: {:?}", e),
}
}
Err((error, msg)) => {
println!("Send failed: {:?}", error);
// Could retry with `msg`
}
}Source§impl<'socket> ContextfulSocket<'socket, Rep0>
impl<'socket> ContextfulSocket<'socket, Rep0>
Sourcepub async fn receive<'s>(
&'s mut self,
) -> Result<(Message, Responder<'s, 'socket>), AioError>
pub async fn receive<'s>( &'s mut self, ) -> Result<(Message, Responder<'s, 'socket>), AioError>
Receives a request and returns the message along with a means to reply.
This method implements the server side of the Request/Reply pattern.
It waits for an incoming request and returns both the request message
and a Responder that must be used to send the reply.
§Responder Pattern
The returned Responder ensures that:
- Each request gets exactly one reply
- Protocol state is maintained correctly
- Resources are cleaned up if the responder is dropped
You must call Responder::reply to complete the request-reply cycle.
Dropping the responder without replying will lead to the client re-sending
their request.
§Cancellation safety
This function is cancellation safe. If cancelled after a request message has
been received, that message will be returned by the next call to receive() on
the same context. No request messages are lost due to cancellation. This ensures
that clients don’t need to resend requests unnecessarily.
§Protocol state
Each context maintains independent protocol state. After receiving a
request, that context cannot receive another request until it has
sent a reply using the Responder.
§Examples
use anng::{protocols::reqrep0, Message};
use std::io::Write;
let socket = reqrep0::Rep0::listen(c"inproc://receive-doctest-echo").await?;
let mut ctx = socket.context();
loop {
let (request, responder) = ctx.receive().await?;
// Echo the request back as a reply
let mut reply = Message::with_capacity(request.len());
reply.write(request.as_slice())?;
// TODO: In production, handle error and retry with returned responder and message
responder.reply(reply).await.unwrap();
}Source§impl<'socket> ContextfulSocket<'socket, Surveyor0>
impl<'socket> ContextfulSocket<'socket, Surveyor0>
Sourcepub async fn survey<'s>(
&'s mut self,
message: Message,
timeout: Duration,
) -> Result<SurveyResponses<'s, 'socket>, (AioError, Message)>
pub async fn survey<'s>( &'s mut self, message: Message, timeout: Duration, ) -> Result<SurveyResponses<'s, 'socket>, (AioError, Message)>
Sends a survey and returns a stream of responses.
This method implements the surveyor side of the Surveyor/Respondent pattern. It broadcasts the survey to all connected respondents and returns a response stream that yields responses as they arrive, up to the specified timeout.
§Survey Timeout
The provided timeout determines how long to wait for responses. After the timeout expires,
SurveyResponses::next will None. The timeout cannot exceed i32::MAX milliseconds;
if it does, this function panics.
§Survey Cancellation
Sending a new survey automatically cancels any ongoing survey. Late responses to the cancelled survey will be discarded.
§Cancellation safety
This function is cancellation safe. If cancelled before the survey is sent, the message will be dropped. If the returned response stream is dropped, the survey remains active until its timeout expires or a new survey is sent on this context. Dropping the stream simply means you won’t receive the responses, but respondents can still send them.
§Errors
Returns Err((error, message)) if the survey operation fails, giving you
back the message for potential retry.
§Examples
use anng::{protocols::survey0, Message};
use std::{io::Write, time::Duration};
let socket = survey0::Surveyor0::listen(c"inproc://vote").await?;
let mut ctx = socket.context();
let mut survey = Message::with_capacity(100);
write!(&mut survey, "Should we upgrade to version 2.0?")?;
let timeout = Duration::from_secs(1);
match ctx.survey(survey, timeout).await {
Ok(mut responses) => {
while let Some(response) = responses.next().await {
match response {
Ok(msg) => println!("Vote: {:?}", msg.as_slice()),
Err(e) => println!("Survey error: {:?}", e),
}
}
println!("Survey completed");
}
Err((error, msg)) => {
println!("Failed to send survey: {:?}", error);
// Could retry with `msg`
}
}Source§impl<'socket> ContextfulSocket<'socket, Respondent0>
impl<'socket> ContextfulSocket<'socket, Respondent0>
Sourcepub async fn next_survey<'s>(
&'s mut self,
) -> Result<(Message, SurveyResponder<'s, 'socket>), AioError>
pub async fn next_survey<'s>( &'s mut self, ) -> Result<(Message, SurveyResponder<'s, 'socket>), AioError>
Receives the next survey and returns the message along with a responder.
This method implements the respondent side of the Surveyor/Respondent pattern.
It waits for an incoming survey and returns both the survey message and a
SurveyResponder that can be used to send a response.
§Responding to a survey
The returned SurveyResponder allows you to send a response, but responding
is entirely optional. You can choose to:
- Respond immediately with
SurveyResponder::respond - Drop the responder to not respond to this survey
- Store the responder and respond later (within the timeout)
Every survey has a timeout set by the surveyor, and if the response is not received within the timeout, it is discarded. This timeout is invisible to the respondents.
§Cancellation safety
This function is cancellation safe. If cancelled after a survey message has
been received, that message will be returned by the next call to next_survey()
on the same context. No survey messages are lost due to cancellation.
§Examples
use anng::{protocols::survey0, Message};
use std::io::Write;
let socket = survey0::Respondent0::dial(c"inproc://service-discovery").await?;
let mut ctx = socket.context();
loop {
let (survey, responder) = ctx.next_survey().await?;
// Check if we want to respond to this survey
if survey.as_slice() == b"health-check" {
let mut response = Message::with_capacity(50);
write!(&mut response, "healthy")?;
// TODO: In production, handle error and retry with returned responder and message
responder.respond(response).await.unwrap();
}
// Otherwise, ignore the survey by not using the responder
}Sourcepub fn try_next_survey<'s>(
&'s mut self,
) -> Result<Option<(Message, SurveyResponder<'s, 'socket>)>, AioError>
pub fn try_next_survey<'s>( &'s mut self, ) -> Result<Option<(Message, SurveyResponder<'s, 'socket>)>, AioError>
Non-blocking variant of next_survey.
Returns Ok(Some((message, responder))) if immediately available, Ok(None) if no survey is available.
Source§impl<'socket> ContextfulSocket<'socket, Sub0>
impl<'socket> ContextfulSocket<'socket, Sub0>
Sourcepub async fn next(&mut self) -> Result<Message, AioError>
pub async fn next(&mut self) -> Result<Message, AioError>
Receives the next message matching the socket’s subscriptions.
This method waits for the next published message that matches any of the socket’s topic subscriptions. If no subscriptions have been set up, this will never return a message.
§Message ordering
Messages are delivered in the order they arrive, but there are no guarantees about ordering across different publishers or topics.
§Cancellation safety
This function is cancellation safe. If cancelled after a message is received,
that message will be returned by the next call to next() on the same context.
Note that the PUB/SUB protocol allows message loss by design - messages may be
dropped during normal operation due to slow subscribers or network conditions,
independent of cancellation.
§Examples
use anng::protocols::pubsub0;
let socket = pubsub0::Sub0::dial(c"inproc://next-doctest-feed").await?;
let mut sub = socket.context();
sub.subscribe_to(b"news.");
loop {
let msg = sub.next().await?;
println!("Received: {:?}", msg.as_slice());
}Sourcepub fn try_next(&mut self) -> Result<Option<Message>, AioError>
pub fn try_next(&mut self) -> Result<Option<Message>, AioError>
Non-blocking variant of next.
Returns Ok(Some(message)) if immediately available, Ok(None) if no message is available.
Sourcepub fn prefer_new(&mut self, prefer: bool)
pub fn prefer_new(&mut self, prefer: bool)
Sets preference for what to do with new messages when receive queue is full.
When true (default): drops oldest message to make room for new ones.
When false: rejects new messages, preserving older ones.
Sourcepub fn disable_filtering(&mut self)
pub fn disable_filtering(&mut self)
Disables topic filtering to receive all published messages.
This is equivalent to subscribing to an empty topic (b""), which
matches all messages. After calling this method, the socket will
receive every message published to the connected publishers.
Note: Subsequent subscribe_to() calls are effectively ignored
since the empty subscription matches all messages regardless of other filters.
§Performance impact
Disabling filtering means receiving all messages, which can impact performance in high-volume scenarios. Use specific subscriptions when possible to reduce network and processing overhead.
§Examples
use anng::protocols::pubsub0;
let socket = pubsub0::Sub0::dial(c"inproc://sub0-disable-filtering-demo").await?;
let mut sub = socket.context();
// Receive all messages regardless of topic
sub.disable_filtering();
// This will now receive every published message
loop {
let msg = sub.next().await?;
println!("Got: {:?}", msg.as_slice());
}Sourcepub fn subscribe_to(&mut self, topic: &[u8])
pub fn subscribe_to(&mut self, topic: &[u8])
Subscribes to messages with the specified topic prefix.
Only messages whose content starts with the given topic prefix will be delivered to this socket. Multiple subscriptions can be active simultaneously - a message matching any subscription will be delivered.
§Topic matching
Topic matching is based on byte-level prefix comparison:
b"news."matchesb"news.sports",b"news.weather", etc.b""(empty) matches all messages (equivalent todisable_filtering)- Matching is case-sensitive and exact
§Performance
Topic filtering happens at the subscriber side, so all messages are still transmitted over the network. For high-volume scenarios with many unwanted messages, consider protocol-level filtering if available.
§Examples
use anng::protocols::pubsub0;
let socket = pubsub0::Sub0::dial(c"inproc://sub0-subscribe-to-demo").await?;
let mut sub = socket.context();
// Subscribe to specific topics
sub.subscribe_to(b"weather.forecast");
sub.subscribe_to(b"news.breaking");
sub.subscribe_to(b"alerts."); // Matches all alerts
// Will receive any message starting with these prefixesSourcepub fn unsubscribe_from(&mut self, topic: &[u8]) -> bool
pub fn unsubscribe_from(&mut self, topic: &[u8]) -> bool
Removes a subscription for the specified topic prefix.
After calling this method, messages with the specified prefix will no longer be delivered to this socket (unless they match other active subscriptions).
§Return Value
Returns true if the subscription was previously active and has been
removed, and false otherwise.
§Examples
use anng::protocols::pubsub0;
let socket = pubsub0::Sub0::dial(c"inproc://sub0-unsubscribe-demo").await?;
let mut sub = socket.context();
sub.subscribe_to(b"news.");
sub.subscribe_to(b"weather.");
// Remove news subscription
let was_subscribed = sub.unsubscribe_from(b"news.");
assert!(was_subscribed);
// Try to remove non-existent subscription
let was_subscribed = sub.unsubscribe_from(b"sports.");
assert!(!was_subscribed);