pub struct RpcSession { /* private fields */ }Expand description
RpcSession owns a transport and multiplexes frames between clients and servers.
§Key invariant
Only RpcSession::run() calls transport.recv_frame(). No other code should
touch recv_frame directly. This prevents the race condition where multiple
callers compete for incoming frames.
Implementations§
Source§impl RpcSession
impl RpcSession
Sourcepub fn new(transport: Transport) -> Self
pub fn new(transport: Transport) -> Self
Create a new RPC session wrapping the given transport handle.
The start_channel_id parameter allows different sessions to use different
channel ID ranges, avoiding collisions in bidirectional RPC scenarios.
- Odd IDs (1, 3, 5, …): typically used by one side
- Even IDs (2, 4, 6, …): typically used by the other side
Sourcepub fn with_channel_start(transport: Transport, start_channel_id: u32) -> Self
pub fn with_channel_start(transport: Transport, start_channel_id: u32) -> Self
Create a new RPC session with a custom starting channel ID.
Use this when you need to coordinate channel IDs between two sessions. For bidirectional RPC over a single transport pair:
- Host session: start at 1 (uses odd channel IDs)
- Plugin session: start at 2 (uses even channel IDs)
Sourcepub fn close(&self)
pub fn close(&self)
Close the underlying transport.
This signals the transport to shut down. The run() loop will exit
once the transport is closed and all pending frames are processed.
Sourcepub fn next_msg_id(&self) -> u64
pub fn next_msg_id(&self) -> u64
Get the next message ID.
Sourcepub fn next_channel_id(&self) -> u32
pub fn next_channel_id(&self) -> u32
Get the next channel ID.
Channel IDs increment by 2 to allow interleaving between two sessions:
- Session A starts at 1: uses 1, 3, 5, 7, …
- Session B starts at 2: uses 2, 4, 6, 8, …
This prevents collisions in bidirectional RPC scenarios.
Sourcepub fn pending_channel_ids(&self) -> Vec<u32>
pub fn pending_channel_ids(&self) -> Vec<u32>
Get the channel IDs of pending RPC calls (for diagnostics).
Returns a sorted list of channel IDs that are waiting for responses.
Sourcepub fn tunnel_channel_ids(&self) -> Vec<u32>
pub fn tunnel_channel_ids(&self) -> Vec<u32>
Get the channel IDs of active tunnels (for diagnostics).
Returns a sorted list of channel IDs with registered tunnels.
Sourcepub fn set_dispatcher<F, Fut>(&self, dispatcher: F)
pub fn set_dispatcher<F, Fut>(&self, dispatcher: F)
Register a dispatcher for incoming requests.
The dispatcher receives the request frame and returns a response frame. If no dispatcher is registered, incoming requests are dropped with a warning.
Sourcepub fn register_tunnel(&self, channel_id: u32) -> Receiver<TunnelChunk>
pub fn register_tunnel(&self, channel_id: u32) -> Receiver<TunnelChunk>
Register a tunnel on the given channel.
Returns a receiver that will receive TunnelChunks as DATA frames arrive
on the channel. The tunnel is active until:
- An EOS frame is received (final chunk has
is_eos = true) close_tunnel()is called- The receiver is dropped
§Panics
Panics if a tunnel is already registered on this channel.
Sourcepub fn open_tunnel_stream(self: &Arc<Self>) -> (TunnelHandle, TunnelStream)
pub fn open_tunnel_stream(self: &Arc<Self>) -> (TunnelHandle, TunnelStream)
Allocate a fresh tunnel channel ID and return a first-class tunnel stream.
This is a convenience wrapper around next_channel_id() + register_tunnel().
Sourcepub fn tunnel_stream(self: &Arc<Self>, channel_id: u32) -> TunnelStream
pub fn tunnel_stream(self: &Arc<Self>, channel_id: u32) -> TunnelStream
Create a first-class tunnel stream for an existing channel ID.
This registers the tunnel receiver immediately.
Sourcepub async fn send_chunk(
&self,
channel_id: u32,
payload: Vec<u8>,
) -> Result<(), RpcError>
pub async fn send_chunk( &self, channel_id: u32, payload: Vec<u8>, ) -> Result<(), RpcError>
Send a chunk on a tunnel channel.
This sends a DATA frame on the channel. The chunk is not marked with EOS;
use close_tunnel() to send the final chunk.
Sourcepub async fn close_tunnel(&self, channel_id: u32) -> Result<(), RpcError>
pub async fn close_tunnel(&self, channel_id: u32) -> Result<(), RpcError>
Close a tunnel by sending EOS (half-close).
This sends a final DATA|EOS frame (with empty payload) to signal the end of the outgoing stream. The tunnel receiver remains active to receive the peer’s remaining chunks until they also send EOS.
After calling this, no more chunks should be sent on this channel.
Sourcepub fn unregister_tunnel(&self, channel_id: u32)
pub fn unregister_tunnel(&self, channel_id: u32)
Unregister a tunnel without sending EOS.
Use this when the tunnel was closed by the remote side (you received EOS) and you want to clean up without sending another EOS.
Sourcepub async fn start_streaming_call(
&self,
method_id: u32,
payload: Vec<u8>,
) -> Result<Receiver<TunnelChunk>, RpcError>
pub async fn start_streaming_call( &self, method_id: u32, payload: Vec<u8>, ) -> Result<Receiver<TunnelChunk>, RpcError>
Start a streaming RPC call.
This sends the request and returns a receiver for streaming responses.
Unlike call(), this doesn’t wait for a single response - instead,
responses are routed to the returned receiver as TunnelChunks.
The caller should:
- Consume chunks from the receiver
- Check
chunk.is_errorand parse as error if true - Otherwise deserialize
chunk.payloadas the expected type - Stop when
chunk.is_eosis true
§Example
let rx = session.start_streaming_call(method_id, payload).await?;
while let Some(chunk) = rx.recv().await {
if chunk.is_error {
let err = parse_error_payload(&chunk.payload);
return Err(err);
}
if chunk.is_eos && chunk.payload.is_empty() {
break; // Stream ended normally
}
let item: T = deserialize(&chunk.payload)?;
// process item...
}Sourcepub async fn notify(
&self,
method_id: u32,
payload: Vec<u8>,
) -> Result<(), RpcError>
pub async fn notify( &self, method_id: u32, payload: Vec<u8>, ) -> Result<(), RpcError>
Send a request frame without registering a waiter or waiting for a reply.
This is useful for fire-and-forget notifications (e.g. tracing events).
The request is sent on channel 0 (the “no channel” channel). The receiver
may still dispatch it like a normal unary RPC request, but if it honors
FrameFlags::NO_REPLY it will not send a response frame.
Sourcepub async fn run(self: Arc<Self>) -> Result<(), TransportError>
pub async fn run(self: Arc<Self>) -> Result<(), TransportError>
Run the demux loop.
This is the main event loop that:
- Receives frames from the transport
- Routes tunnel frames to registered tunnel receivers
- Routes responses to waiting clients
- Dispatches requests to the registered handler
This method consumes self and runs until the transport closes.