pub struct RpcSession<T: Transport> { /* private fields */ }Expand description
RpcSession owns a transport and multiplexes frames between clients and servers.
§Key invariant
Only RpcSession::run() calls transport.recv_frame(). No other code should
touch recv_frame directly. This prevents the race condition where multiple
callers compete for incoming frames.
Implementations§
Source§impl<T: Transport + Send + Sync + 'static> RpcSession<T>
impl<T: Transport + Send + Sync + 'static> RpcSession<T>
Sourcepub fn new(transport: Arc<T>) -> Self
pub fn new(transport: Arc<T>) -> Self
Create a new RPC session wrapping the given transport.
The start_channel_id parameter allows different sessions to use different
channel ID ranges, avoiding collisions in bidirectional RPC scenarios.
- Odd IDs (1, 3, 5, …): typically used by one side
- Even IDs (2, 4, 6, …): typically used by the other side
Sourcepub fn with_channel_start(transport: Arc<T>, start_channel_id: u32) -> Self
pub fn with_channel_start(transport: Arc<T>, start_channel_id: u32) -> Self
Create a new RPC session with a custom starting channel ID.
Use this when you need to coordinate channel IDs between two sessions. For bidirectional RPC over a single transport pair:
- Host session: start at 1 (uses odd channel IDs)
- Plugin session: start at 2 (uses even channel IDs)
Sourcepub fn next_msg_id(&self) -> u64
pub fn next_msg_id(&self) -> u64
Get the next message ID.
Sourcepub fn next_channel_id(&self) -> u32
pub fn next_channel_id(&self) -> u32
Get the next channel ID.
Channel IDs increment by 2 to allow interleaving between two sessions:
- Session A starts at 1: uses 1, 3, 5, 7, …
- Session B starts at 2: uses 2, 4, 6, 8, …
This prevents collisions in bidirectional RPC scenarios.
Sourcepub fn set_dispatcher<F, Fut>(&self, dispatcher: F)
pub fn set_dispatcher<F, Fut>(&self, dispatcher: F)
Register a dispatcher for incoming requests.
The dispatcher receives (channel_id, method_id, payload) and returns a response frame. If no dispatcher is registered, incoming requests are dropped with a warning.
Sourcepub fn register_tunnel(&self, channel_id: u32) -> Receiver<TunnelChunk>
pub fn register_tunnel(&self, channel_id: u32) -> Receiver<TunnelChunk>
Register a tunnel on the given channel.
Returns a receiver that will receive TunnelChunks as DATA frames arrive
on the channel. The tunnel is active until:
- An EOS frame is received (final chunk has
is_eos = true) close_tunnel()is called- The receiver is dropped
§Panics
Panics if a tunnel is already registered on this channel.
Sourcepub async fn send_chunk(
&self,
channel_id: u32,
payload: Vec<u8>,
) -> Result<(), RpcError>
pub async fn send_chunk( &self, channel_id: u32, payload: Vec<u8>, ) -> Result<(), RpcError>
Send a chunk on a tunnel channel.
This sends a DATA frame on the channel. The chunk is not marked with EOS;
use close_tunnel() to send the final chunk.
Sourcepub async fn close_tunnel(&self, channel_id: u32) -> Result<(), RpcError>
pub async fn close_tunnel(&self, channel_id: u32) -> Result<(), RpcError>
Close a tunnel by sending EOS (half-close).
This sends a final DATA|EOS frame (with empty payload) to signal the end of the outgoing stream. The tunnel receiver remains active to receive the peer’s remaining chunks until they also send EOS.
After calling this, no more chunks should be sent on this channel.
Sourcepub fn unregister_tunnel(&self, channel_id: u32)
pub fn unregister_tunnel(&self, channel_id: u32)
Unregister a tunnel without sending EOS.
Use this when the tunnel was closed by the remote side (you received EOS) and you want to clean up without sending another EOS.
Sourcepub async fn start_streaming_call(
&self,
method_id: u32,
payload: Vec<u8>,
) -> Result<Receiver<TunnelChunk>, RpcError>
pub async fn start_streaming_call( &self, method_id: u32, payload: Vec<u8>, ) -> Result<Receiver<TunnelChunk>, RpcError>
Start a streaming RPC call.
This sends the request and returns a receiver for streaming responses.
Unlike call(), this doesn’t wait for a single response - instead,
responses are routed to the returned receiver as TunnelChunks.
The caller should:
- Consume chunks from the receiver
- Check
chunk.is_errorand parse as error if true - Otherwise deserialize
chunk.payloadas the expected type - Stop when
chunk.is_eosis true
§Example
let rx = session.start_streaming_call(method_id, payload).await?;
while let Some(chunk) = rx.recv().await {
if chunk.is_error {
let err = parse_error_payload(&chunk.payload);
return Err(err);
}
if chunk.is_eos && chunk.payload.is_empty() {
break; // Stream ended normally
}
let item: T = deserialize(&chunk.payload)?;
// process item...
}Sourcepub async fn send_response(&self, frame: &Frame) -> Result<(), RpcError>
pub async fn send_response(&self, frame: &Frame) -> Result<(), RpcError>
Send a response frame.
Sourcepub async fn run(self: Arc<Self>) -> Result<(), TransportError>
pub async fn run(self: Arc<Self>) -> Result<(), TransportError>
Run the demux loop.
This is the main event loop that:
- Receives frames from the transport
- Routes tunnel frames to registered tunnel receivers
- Routes responses to waiting clients
- Dispatches requests to the registered handler
This method consumes self and runs until the transport closes.