ContextfulSocket

Struct ContextfulSocket 

Source
pub struct ContextfulSocket<'socket, Protocol> { /* private fields */ }
Expand description

An async-ready socket with an associated context for concurrent operations.

ContextfulSocket wraps a Socket with an NNG context, enabling safe concurrent operations on the same underlying socket. Each context maintains independent protocol state, which is crucial for protocols like Request/Reply where the state machine must be maintained per operation.

§Why contexts are necessary

NNG sockets themselves are not safe for concurrent operations. Attempting concurrent operations directly on a socket can lead to:

  • Protocol state violations (e.g., receiving before sending in REQ/REP)
  • Message corruption or loss
  • Deadlocks or race conditions

Contexts solve this by providing independent protocol state machines that share the same underlying transport and configuration.

§Cancellation safety

All async operations on ContextfulSocket are cancellation safe. You can safely use this type with tokio::select! and other cancellation patterns:

  • Receive operations: If cancelled after a message arrives, that message will generally be returned by the next receive call on the same context, though the exact semantics depend on the protocol used. Prefer pinning receives outside of select! loops to avoid potential starvation.
  • Send operations: If cancelled, the message may or may not be sent (depending on timing). The message will be dropped if not sent.
  • Resource cleanup: All resources are properly cleaned up on cancellation with no leaks or undefined behavior.

The library handles all NNG cleanup requirements internally, including calling nng_aio_cancel and waiting for operations to fully terminate.

Implementations§

Source§

impl<'socket> ContextfulSocket<'socket, Req0>

Source

pub async fn request<'s>( &'s mut self, message: Message, ) -> Result<impl Future<Output = Result<Message, AioError>> + use<'s, 'socket>, (AioError, Message)>

Sends a request and returns a future that will resolve to the reply.

This method implements the client side of the Request/Reply pattern. It sends the provided message as a request and returns a future that will resolve to the reply message when it arrives.

§Two-Phase Operation

The request operation is split into two phases:

  1. Send phase: Returns immediately with a future if the send succeeds
  2. Reply phase: The returned future awaits the reply

This design allows you to handle send failures immediately while still benefiting from async reply handling.

§Cancellation safety

This function is cancellation safe. If the send phase is cancelled, the message may or may not be sent (depending on when cancellation occurs relative to NNG’s internal dispatch). The message will be dropped if not sent. If the reply future is cancelled, any incoming reply will be discarded, and a new request can be sent immediately.

§Protocol state

Each context maintains independent protocol state. You can have multiple outstanding requests across different contexts, but each individual context must complete its request-reply cycle before starting a new one. This is enforced at compile-time.

§Errors

Returns Err((error, message)) if the send operation fails, giving you back the message for potential retry. The reply future can fail with network errors, timeouts, or cancellation.

§Examples
use anng::{protocols::reqrep0, Message, AioError};
use std::io::Write;

let socket = reqrep0::Req0::dial(c"inproc://request-doctest").await?;
let mut ctx = socket.context();

let mut request = Message::with_capacity(100);
write!(&mut request, "Hello")?;

match ctx.request(request).await {
    Ok(reply_future) => {
        match reply_future.await {
            Ok(reply) => println!("Got reply: {:?}", reply.as_slice()),
            Err(AioError::TimedOut) => println!("Request timed out"),
            Err(e) => println!("Request failed: {:?}", e),
        }
    }
    Err((error, msg)) => {
        println!("Send failed: {:?}", error);
        // Could retry with `msg`
    }
}
Source§

impl<'socket> ContextfulSocket<'socket, Rep0>

Source

pub async fn receive<'s>( &'s mut self, ) -> Result<(Message, Responder<'s, 'socket>), AioError>

Receives a request and returns the message along with a means to reply.

This method implements the server side of the Request/Reply pattern. It waits for an incoming request and returns both the request message and a Responder that must be used to send the reply.

§Responder Pattern

The returned Responder ensures that:

  • Each request gets exactly one reply
  • Protocol state is maintained correctly
  • Resources are cleaned up if the responder is dropped

You must call Responder::reply to complete the request-reply cycle. Dropping the responder without replying will lead to the client re-sending their request.

§Cancellation safety

This function is cancellation safe. If cancelled after a request message has been received, that message will be returned by the next call to receive() on the same context. No request messages are lost due to cancellation. This ensures that clients don’t need to resend requests unnecessarily.

§Protocol state

Each context maintains independent protocol state. After receiving a request, that context cannot receive another request until it has sent a reply using the Responder.

§Examples
use anng::{protocols::reqrep0, Message};
use std::io::Write;

let socket = reqrep0::Rep0::listen(c"inproc://receive-doctest-echo").await?;
let mut ctx = socket.context();

loop {
    let (request, responder) = ctx.receive().await?;

    // Echo the request back as a reply
    let mut reply = Message::with_capacity(request.len());
    reply.write(request.as_slice())?;

    // TODO: In production, handle error and retry with returned responder and message
    responder.reply(reply).await.unwrap();
}
Source

pub fn try_receive<'s>( &'s mut self, ) -> Result<Option<(Message, Responder<'s, 'socket>)>, AioError>

Non-blocking variant of receive.

Returns Ok(Some((message, responder))) if immediately available, Ok(None) if no request is available.

Source§

impl<'socket> ContextfulSocket<'socket, Surveyor0>

Source

pub async fn survey<'s>( &'s mut self, message: Message, timeout: Duration, ) -> Result<SurveyResponses<'s, 'socket>, (AioError, Message)>

Sends a survey and returns a stream of responses.

This method implements the surveyor side of the Surveyor/Respondent pattern. It broadcasts the survey to all connected respondents and returns a response stream that yields responses as they arrive, up to the specified timeout.

§Survey Timeout

The provided timeout determines how long to wait for responses. After the timeout expires, SurveyResponses::next will None. The timeout cannot exceed i32::MAX milliseconds; if it does, this function panics.

§Survey Cancellation

Sending a new survey automatically cancels any ongoing survey. Late responses to the cancelled survey will be discarded.

§Cancellation safety

This function is cancellation safe. If cancelled before the survey is sent, the message will be dropped. If the returned response stream is dropped, the survey remains active until its timeout expires or a new survey is sent on this context. Dropping the stream simply means you won’t receive the responses, but respondents can still send them.

§Errors

Returns Err((error, message)) if the survey operation fails, giving you back the message for potential retry.

§Examples
use anng::{protocols::survey0, Message};
use std::{io::Write, time::Duration};

let socket = survey0::Surveyor0::listen(c"inproc://vote").await?;
let mut ctx = socket.context();

let mut survey = Message::with_capacity(100);
write!(&mut survey, "Should we upgrade to version 2.0?")?;

let timeout = Duration::from_secs(1);
match ctx.survey(survey, timeout).await {
    Ok(mut responses) => {
        while let Some(response) = responses.next().await {
            match response {
                Ok(msg) => println!("Vote: {:?}", msg.as_slice()),
                Err(e) => println!("Survey error: {:?}", e),
            }
        }
        println!("Survey completed");
    }
    Err((error, msg)) => {
        println!("Failed to send survey: {:?}", error);
        // Could retry with `msg`
    }
}
Source§

impl<'socket> ContextfulSocket<'socket, Respondent0>

Source

pub async fn next_survey<'s>( &'s mut self, ) -> Result<(Message, SurveyResponder<'s, 'socket>), AioError>

Receives the next survey and returns the message along with a responder.

This method implements the respondent side of the Surveyor/Respondent pattern. It waits for an incoming survey and returns both the survey message and a SurveyResponder that can be used to send a response.

§Responding to a survey

The returned SurveyResponder allows you to send a response, but responding is entirely optional. You can choose to:

  • Respond immediately with SurveyResponder::respond
  • Drop the responder to not respond to this survey
  • Store the responder and respond later (within the timeout)

Every survey has a timeout set by the surveyor, and if the response is not received within the timeout, it is discarded. This timeout is invisible to the respondents.

§Cancellation safety

This function is cancellation safe. If cancelled after a survey message has been received, that message will be returned by the next call to next_survey() on the same context. No survey messages are lost due to cancellation.

§Examples
use anng::{protocols::survey0, Message};
use std::io::Write;

let socket = survey0::Respondent0::dial(c"inproc://service-discovery").await?;
let mut ctx = socket.context();

loop {
    let (survey, responder) = ctx.next_survey().await?;

    // Check if we want to respond to this survey
    if survey.as_slice() == b"health-check" {
        let mut response = Message::with_capacity(50);
        write!(&mut response, "healthy")?;

        // TODO: In production, handle error and retry with returned responder and message
        responder.respond(response).await.unwrap();
    }
    // Otherwise, ignore the survey by not using the responder
}
Source

pub fn try_next_survey<'s>( &'s mut self, ) -> Result<Option<(Message, SurveyResponder<'s, 'socket>)>, AioError>

Non-blocking variant of next_survey.

Returns Ok(Some((message, responder))) if immediately available, Ok(None) if no survey is available.

Source§

impl<'socket> ContextfulSocket<'socket, Sub0>

Source

pub async fn next(&mut self) -> Result<Message, AioError>

Receives the next message matching the socket’s subscriptions.

This method waits for the next published message that matches any of the socket’s topic subscriptions. If no subscriptions have been set up, this will never return a message.

§Message ordering

Messages are delivered in the order they arrive, but there are no guarantees about ordering across different publishers or topics.

§Cancellation safety

This function is cancellation safe. If cancelled after a message is received, that message will be returned by the next call to next() on the same context. Note that the PUB/SUB protocol allows message loss by design - messages may be dropped during normal operation due to slow subscribers or network conditions, independent of cancellation.

§Examples
use anng::protocols::pubsub0;

let socket = pubsub0::Sub0::dial(c"inproc://next-doctest-feed").await?;
let mut sub = socket.context();

sub.subscribe_to(b"news.");

loop {
    let msg = sub.next().await?;
    println!("Received: {:?}", msg.as_slice());
}
Source

pub fn try_next(&mut self) -> Result<Option<Message>, AioError>

Non-blocking variant of next.

Returns Ok(Some(message)) if immediately available, Ok(None) if no message is available.

Source

pub fn prefer_new(&mut self, prefer: bool)

Sets preference for what to do with new messages when receive queue is full.

When true (default): drops oldest message to make room for new ones. When false: rejects new messages, preserving older ones.

Source

pub fn disable_filtering(&mut self)

Disables topic filtering to receive all published messages.

This is equivalent to subscribing to an empty topic (b""), which matches all messages. After calling this method, the socket will receive every message published to the connected publishers.

Note: Subsequent subscribe_to() calls are effectively ignored since the empty subscription matches all messages regardless of other filters.

§Performance impact

Disabling filtering means receiving all messages, which can impact performance in high-volume scenarios. Use specific subscriptions when possible to reduce network and processing overhead.

§Examples
use anng::protocols::pubsub0;

let socket = pubsub0::Sub0::dial(c"inproc://sub0-disable-filtering-demo").await?;
let mut sub = socket.context();

// Receive all messages regardless of topic
sub.disable_filtering();

// This will now receive every published message
loop {
    let msg = sub.next().await?;
    println!("Got: {:?}", msg.as_slice());
}
Source

pub fn subscribe_to(&mut self, topic: &[u8])

Subscribes to messages with the specified topic prefix.

Only messages whose content starts with the given topic prefix will be delivered to this socket. Multiple subscriptions can be active simultaneously - a message matching any subscription will be delivered.

§Topic matching

Topic matching is based on byte-level prefix comparison:

  • b"news." matches b"news.sports", b"news.weather", etc.
  • b"" (empty) matches all messages (equivalent to disable_filtering)
  • Matching is case-sensitive and exact
§Performance

Topic filtering happens at the subscriber side, so all messages are still transmitted over the network. For high-volume scenarios with many unwanted messages, consider protocol-level filtering if available.

§Examples
use anng::protocols::pubsub0;

let socket = pubsub0::Sub0::dial(c"inproc://sub0-subscribe-to-demo").await?;
let mut sub = socket.context();

// Subscribe to specific topics
sub.subscribe_to(b"weather.forecast");
sub.subscribe_to(b"news.breaking");
sub.subscribe_to(b"alerts."); // Matches all alerts

// Will receive any message starting with these prefixes
Source

pub fn unsubscribe_from(&mut self, topic: &[u8]) -> bool

Removes a subscription for the specified topic prefix.

After calling this method, messages with the specified prefix will no longer be delivered to this socket (unless they match other active subscriptions).

§Return Value

Returns true if the subscription was previously active and has been removed, and false otherwise.

§Examples
use anng::protocols::pubsub0;

let socket = pubsub0::Sub0::dial(c"inproc://sub0-unsubscribe-demo").await?;
let mut sub = socket.context();

sub.subscribe_to(b"news.");
sub.subscribe_to(b"weather.");

// Remove news subscription
let was_subscribed = sub.unsubscribe_from(b"news.");
assert!(was_subscribed);

// Try to remove non-existent subscription
let was_subscribed = sub.unsubscribe_from(b"sports.");
assert!(!was_subscribed);
Source§

impl<'socket, Protocol> ContextfulSocket<'socket, Protocol>

Source

pub fn socket(&self) -> &'socket Socket<Protocol>

Returns a reference to the shared socket underpinning this context.

Note that any operation performed on this socket will affect all associated contexts!

Trait Implementations§

Source§

impl<Protocol> Debug for ContextfulSocket<'_, Protocol>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<'socket, Protocol> Freeze for ContextfulSocket<'socket, Protocol>

§

impl<'socket, Protocol> RefUnwindSafe for ContextfulSocket<'socket, Protocol>
where Protocol: RefUnwindSafe,

§

impl<'socket, Protocol> Send for ContextfulSocket<'socket, Protocol>
where Protocol: Sync,

§

impl<'socket, Protocol> Sync for ContextfulSocket<'socket, Protocol>
where Protocol: Sync,

§

impl<'socket, Protocol> Unpin for ContextfulSocket<'socket, Protocol>

§

impl<'socket, Protocol> UnwindSafe for ContextfulSocket<'socket, Protocol>
where Protocol: RefUnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more