StreamingRpcExt

Trait StreamingRpcExt 

Source
pub trait StreamingRpcExt {
    // Required methods
    fn stream_request<Req, Resp>(
        &self,
        topic: &str,
        request: &Req,
    ) -> impl Future<Output = Result<StreamReceiver<Resp>>> + Send
       where Req: Message + Serialize + Clone,
             Resp: Message + DeserializeOwned + Send + 'static;
    fn stream_respond<Req, Resp, F, Fut>(
        &self,
        topic: &str,
        handler: F,
    ) -> impl Future<Output = Result<()>> + Send
       where Req: Message + DeserializeOwned + Send + 'static,
             Resp: Message + Serialize + Clone + Send + 'static,
             F: Fn(Req, StreamSender<Resp>) -> Fut + Send + Sync + 'static,
             Fut: Future<Output = Result<()>> + Send + 'static;
    fn bidirectional_stream<T>(
        &self,
        topic: &str,
    ) -> impl Future<Output = Result<(StreamSender<T>, StreamReceiver<T>)>> + Send
       where T: Message + Serialize + DeserializeOwned + Clone + Send + 'static;
}
Expand description

Extension trait for streaming RPC on Context

Required Methods§

Source

fn stream_request<Req, Resp>( &self, topic: &str, request: &Req, ) -> impl Future<Output = Result<StreamReceiver<Resp>>> + Send
where Req: Message + Serialize + Clone, Resp: Message + DeserializeOwned + Send + 'static,

Create a server-streaming RPC (client sends one request, server streams responses)

§Example
let mut stream = ctx
    .stream_request("/camera/feed", &CameraRequest { camera_id: 0 })
    .await?;

while let Some(frame) = stream.recv().await {
    process_frame(&frame)?;
}
Source

fn stream_respond<Req, Resp, F, Fut>( &self, topic: &str, handler: F, ) -> impl Future<Output = Result<()>> + Send
where Req: Message + DeserializeOwned + Send + 'static, Resp: Message + Serialize + Clone + Send + 'static, F: Fn(Req, StreamSender<Resp>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<()>> + Send + 'static,

Handle server-streaming RPC requests

§Example
ctx.stream_respond("/camera/feed", |req: CameraRequest, sender| async move {
    loop {
        let frame = capture_frame(req.camera_id).await?;
        sender.send(&frame).await?;
        tokio::time::sleep(Duration::from_millis(33)).await;
    }
}).await?;
Source

fn bidirectional_stream<T>( &self, topic: &str, ) -> impl Future<Output = Result<(StreamSender<T>, StreamReceiver<T>)>> + Send
where T: Message + Serialize + DeserializeOwned + Clone + Send + 'static,

Create a bidirectional stream (both sides can send/receive)

§Example
let (mut tx, mut rx) = ctx.bidirectional_stream("/chat").await?;

// Send
tx.send(&ChatMessage { text: "Hello".into() }).await?;

// Receive
while let Some(msg) = rx.recv().await {
    println!("Received: {}", msg.text);
}

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§