Skip to main content

ecksport_rpc/
client.rs

1//! Client call/response patterns.
2//!
3//! This is defined to use an existing `WorkerHandle` so it isn't as strictly
4//! low-level as it could be, but it makes the ergonomics easier and lets us use
5//! it concurrently.
6
7use ecksport_codec::RpcCodec;
8use ecksport_core::{frame::MsgFlags, topic};
9use ecksport_net::worker::ConnectionHandle;
10
11use crate::errors::{ClientError, ClientResult, RpcError};
12
13pub struct RpcClient {
14    conn: ConnectionHandle,
15}
16
17impl RpcClient {
18    pub fn new(conn: ConnectionHandle) -> Self {
19        Self { conn }
20    }
21
22    /// Low-level function to call a server's RPC.
23    pub async fn _call_rpc_func<T: RpcCodec, R: RpcCodec>(
24        &self,
25        topic: topic::Topic,
26        arg: &T,
27    ) -> ClientResult<R> {
28        let buf = match ecksport_codec::encode_to_vec(arg) {
29            Ok(b) => b,
30            Err(e) => return Err(ClientError::MalformedRequest(e)),
31        };
32
33        // Open the channel, send the message over, and close it all in a single frame.
34        let mut channel = self
35            .conn
36            .open_channel(topic, buf, MsgFlags::close())
37            .await?;
38        let Some(msg) = channel.recv_msg().await? else {
39            // Does it make sense to return it like this?
40            return Err(ClientError::NoResponse);
41        };
42
43        // Parse it depending on how the flag was set.
44        if !msg.flags().err {
45            match R::from_slice(msg.payload()) {
46                Ok(r) => Ok(r),
47                Err(e) => Err(ClientError::MalformedResponse(e)),
48            }
49        } else {
50            match RpcError::from_slice(msg.payload()) {
51                Ok(rpc_err) => Err(ClientError::ServerRpc(rpc_err)),
52                Err(_e) => Err(ClientError::ParseErrorCode),
53            }
54        }
55    }
56}