RpcSession

Struct RpcSession 

Source
pub struct RpcSession<T: Transport> { /* private fields */ }
Expand description

RpcSession owns a transport and multiplexes frames between clients and servers.

§Key invariant

Only RpcSession::run() calls transport.recv_frame(). No other code should touch recv_frame directly. This prevents the race condition where multiple callers compete for incoming frames.

Implementations§

Source§

impl<T: Transport + Send + Sync + 'static> RpcSession<T>

Source

pub fn new(transport: Arc<T>) -> Self

Create a new RPC session wrapping the given transport.

The start_channel_id parameter allows different sessions to use different channel ID ranges, avoiding collisions in bidirectional RPC scenarios.

  • Odd IDs (1, 3, 5, …): typically used by one side
  • Even IDs (2, 4, 6, …): typically used by the other side
Source

pub fn with_channel_start(transport: Arc<T>, start_channel_id: u32) -> Self

Create a new RPC session with a custom starting channel ID.

Use this when you need to coordinate channel IDs between two sessions. For bidirectional RPC over a single transport pair:

  • Host session: start at 1 (uses odd channel IDs)
  • Plugin session: start at 2 (uses even channel IDs)
Source

pub fn transport(&self) -> &T

Get a reference to the underlying transport.

Source

pub fn next_msg_id(&self) -> u64

Get the next message ID.

Source

pub fn next_channel_id(&self) -> u32

Get the next channel ID.

Channel IDs increment by 2 to allow interleaving between two sessions:

  • Session A starts at 1: uses 1, 3, 5, 7, …
  • Session B starts at 2: uses 2, 4, 6, 8, …

This prevents collisions in bidirectional RPC scenarios.

Source

pub fn pending_channel_ids(&self) -> Vec<u32>

Get the channel IDs of pending RPC calls (for diagnostics).

Returns a sorted list of channel IDs that are waiting for responses.

Source

pub fn tunnel_channel_ids(&self) -> Vec<u32>

Get the channel IDs of active tunnels (for diagnostics).

Returns a sorted list of channel IDs with registered tunnels.

Source

pub fn set_dispatcher<F, Fut>(&self, dispatcher: F)
where F: Fn(u32, u32, Vec<u8>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Frame, RpcError>> + Send + 'static,

Register a dispatcher for incoming requests.

The dispatcher receives (channel_id, method_id, payload) and returns a response frame. If no dispatcher is registered, incoming requests are dropped with a warning.

Source

pub fn register_tunnel(&self, channel_id: u32) -> Receiver<TunnelChunk>

Register a tunnel on the given channel.

Returns a receiver that will receive TunnelChunks as DATA frames arrive on the channel. The tunnel is active until:

  • An EOS frame is received (final chunk has is_eos = true)
  • close_tunnel() is called
  • The receiver is dropped
§Panics

Panics if a tunnel is already registered on this channel.

Source

pub async fn send_chunk( &self, channel_id: u32, payload: Vec<u8>, ) -> Result<(), RpcError>

Send a chunk on a tunnel channel.

This sends a DATA frame on the channel. The chunk is not marked with EOS; use close_tunnel() to send the final chunk.

Source

pub async fn close_tunnel(&self, channel_id: u32) -> Result<(), RpcError>

Close a tunnel by sending EOS (half-close).

This sends a final DATA|EOS frame (with empty payload) to signal the end of the outgoing stream. The tunnel receiver remains active to receive the peer’s remaining chunks until they also send EOS.

After calling this, no more chunks should be sent on this channel.

Source

pub fn unregister_tunnel(&self, channel_id: u32)

Unregister a tunnel without sending EOS.

Use this when the tunnel was closed by the remote side (you received EOS) and you want to clean up without sending another EOS.

Source

pub async fn start_streaming_call( &self, method_id: u32, payload: Vec<u8>, ) -> Result<Receiver<TunnelChunk>, RpcError>

Start a streaming RPC call.

This sends the request and returns a receiver for streaming responses. Unlike call(), this doesn’t wait for a single response - instead, responses are routed to the returned receiver as TunnelChunks.

The caller should:

  1. Consume chunks from the receiver
  2. Check chunk.is_error and parse as error if true
  3. Otherwise deserialize chunk.payload as the expected type
  4. Stop when chunk.is_eos is true
§Example
let rx = session.start_streaming_call(method_id, payload).await?;
while let Some(chunk) = rx.recv().await {
    if chunk.is_error {
        let err = parse_error_payload(&chunk.payload);
        return Err(err);
    }
    if chunk.is_eos && chunk.payload.is_empty() {
        break; // Stream ended normally
    }
    let item: T = deserialize(&chunk.payload)?;
    // process item...
}
Source

pub async fn send_response(&self, frame: &Frame) -> Result<(), RpcError>

Send a response frame.

Source

pub async fn run(self: Arc<Self>) -> Result<(), TransportError>

Run the demux loop.

This is the main event loop that:

  1. Receives frames from the transport
  2. Routes tunnel frames to registered tunnel receivers
  3. Routes responses to waiting clients
  4. Dispatches requests to the registered handler

This method consumes self and runs until the transport closes.

Auto Trait Implementations§

§

impl<T> !Freeze for RpcSession<T>

§

impl<T> !RefUnwindSafe for RpcSession<T>

§

impl<T> Send for RpcSession<T>

§

impl<T> Sync for RpcSession<T>

§

impl<T> Unpin for RpcSession<T>

§

impl<T> !UnwindSafe for RpcSession<T>

Blanket Implementations§

§

impl<T> Any for T
where T: 'static + ?Sized,

§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> Borrow<T> for T
where T: ?Sized,

§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
§

impl<T> BorrowMut<T> for T
where T: ?Sized,

§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> From<T> for T

§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
§

impl<T, U> Into<U> for T
where U: From<T>,

§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> TimeoutExt for T

Source§

fn timeout(self, duration: Duration) -> Timeout<Self>

Requires a Future or Stream to complete before the specific duration has elapsed. Read more
§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more