datacake_rpc/
client.rs

1use std::borrow::Cow;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use crate::handler::{Handler, RpcService, TryAsBody, TryIntoBody};
6use crate::net::{Channel, Status};
7use crate::request::{MessageMetadata, RequestContents};
8use crate::Body;
9
10/// A type alias for the returned data view of the RPC message reply.
11pub type MessageReply<Svc, Msg> =
12    <<Svc as Handler<Msg>>::Reply as RequestContents>::Content;
13
14/// A RPC client handle for a given service.
15///
16/// ```rust
17/// use rkyv::{Archive, Deserialize, Serialize};
18/// use datacake_rpc::{Handler, Request, RpcService, ServiceRegistry, Status, RpcClient, Channel};
19/// use std::net::SocketAddr;
20///
21/// #[repr(C)]
22/// #[derive(Serialize, Deserialize, Archive, PartialEq, Debug)]
23/// #[archive(compare(PartialEq), check_bytes)]
24/// #[archive_attr(derive(PartialEq, Debug))]
25/// pub struct MyMessage {
26///     name: String,
27///     age: u32,
28/// }
29///
30/// pub struct EchoService;
31///
32/// impl RpcService for EchoService {
33///     fn register_handlers(registry: &mut ServiceRegistry<Self>) {
34///         registry.add_handler::<MyMessage>();
35///     }
36/// }
37///
38/// #[datacake_rpc::async_trait]
39/// impl Handler<MyMessage> for EchoService {
40///     type Reply = MyMessage;
41///
42///     async fn on_message(&self, msg: Request<MyMessage>) -> Result<Self::Reply, Status> {
43///         Ok(msg.to_owned().unwrap())
44///     }
45/// }
46///
47/// # #[tokio::main]
48/// # async fn main() -> anyhow::Result<()> {
49/// # use datacake_rpc::Server;
50/// # let bind = "127.0.0.1:8000".parse::<SocketAddr>().unwrap();
51/// # let server = Server::listen(bind).await.unwrap();
52/// # server.add_service(EchoService);
53/// let connect = "127.0.0.1:8000".parse::<SocketAddr>()?;
54/// let client = Channel::connect(connect);
55///
56/// let rpc_client = RpcClient::<EchoService>::new(client);
57///
58/// let msg = MyMessage {
59///     name: "Bobby".to_string(),
60///     age: 12,
61/// };
62///
63/// let resp = rpc_client.send(&msg).await?;
64/// assert_eq!(resp, msg);
65/// # Ok(())
66/// # }
67/// ```
68pub struct RpcClient<Svc>
69where
70    Svc: RpcService,
71{
72    channel: Channel,
73    timeout: Option<Duration>,
74    _p: PhantomData<Svc>,
75}
76
77impl<Svc> Clone for RpcClient<Svc>
78where
79    Svc: RpcService,
80{
81    fn clone(&self) -> Self {
82        Self {
83            channel: self.channel.clone(),
84            timeout: self.timeout,
85            _p: PhantomData,
86        }
87    }
88}
89impl<Svc> RpcClient<Svc>
90where
91    Svc: RpcService,
92{
93    /// Creates a new RPC client which can handle a new service type.
94    ///
95    /// [RpcClient]'s are cheap to create and should be preferred over
96    /// locking or other synchronization primitives.
97    pub fn new(channel: Channel) -> Self {
98        Self {
99            channel,
100            timeout: None,
101            _p: PhantomData,
102        }
103    }
104
105    /// Sets a timeout of a given amount of time.
106    ///
107    /// If any requests exceed this amount of time `Status::timeout` is returned.
108    pub fn set_timeout(&mut self, timeout: Duration) {
109        self.timeout = Some(timeout);
110    }
111
112    /// Creates a new RPC client which can handle a new service type.
113    ///
114    /// [RpcClient]'s are cheap to create and should be preferred over
115    /// locking or other synchronization primitives.
116    pub fn new_client<Svc2>(&self) -> RpcClient<Svc2>
117    where
118        Svc2: RpcService,
119    {
120        RpcClient {
121            channel: self.channel.clone(),
122            timeout: None,
123            _p: PhantomData,
124        }
125    }
126
127    /// Sends a message to the server and wait for a reply.
128    ///
129    /// This lets you send messages behind a reference which can help
130    /// avoid excess copies when it isn't needed.
131    ///
132    /// In the event you need to send a [Body] or type which must consume `self`
133    /// you can use [Self::send_owned]
134    pub async fn send<Msg>(&self, msg: &Msg) -> Result<MessageReply<Svc, Msg>, Status>
135    where
136        Msg: RequestContents + TryAsBody,
137        Svc: Handler<Msg>,
138        // Due to some interesting compiler errors, we couldn't use GATs here to enforce
139        // this on the trait side, which is a shame.
140        <Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
141    {
142        let metadata = MessageMetadata {
143            service_name: Cow::Borrowed(<Svc as RpcService>::service_name()),
144            path: Cow::Borrowed(<Svc as Handler<Msg>>::path()),
145        };
146
147        let body = msg.try_as_body()?;
148        self.send_body(body, metadata).await
149    }
150
151    /// Sends a message to the server and wait for a reply using an owned
152    /// message value.
153    ///
154    /// This allows you to send types implementing [TryIntoBody] like [Body].
155    pub async fn send_owned<Msg>(
156        &self,
157        msg: Msg,
158    ) -> Result<MessageReply<Svc, Msg>, Status>
159    where
160        Msg: RequestContents + TryIntoBody,
161        Svc: Handler<Msg>,
162        // Due to some interesting compiler errors, we couldn't use GATs here to enforce
163        // this on the trait side, which is a shame.
164        <Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
165    {
166        let metadata = MessageMetadata {
167            service_name: Cow::Borrowed(<Svc as RpcService>::service_name()),
168            path: Cow::Borrowed(<Svc as Handler<Msg>>::path()),
169        };
170
171        let body = msg.try_into_body()?;
172        self.send_body(body, metadata).await
173    }
174
175    async fn send_body<Msg>(
176        &self,
177        body: Body,
178        metadata: MessageMetadata,
179    ) -> Result<MessageReply<Svc, Msg>, Status>
180    where
181        Msg: RequestContents,
182        Svc: Handler<Msg>,
183        // Due to some interesting compiler errors, we couldn't use GATs here to enforce
184        // this on the trait side, which is a shame.
185        <Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
186    {
187        let future = self.channel.send_msg(metadata, body);
188
189        let result = match self.timeout {
190            Some(duration) => tokio::time::timeout(duration, future)
191                .await
192                .map_err(|_| Status::timeout())?
193                .map_err(Status::connection)?,
194            None => future.await.map_err(Status::connection)?,
195        };
196
197        match result {
198            Ok(body) => <<Svc as Handler<Msg>>::Reply>::from_body(body).await,
199            Err(buffer) => {
200                let status = rkyv::from_bytes(&buffer).map_err(|_| Status::invalid())?;
201                Err(status)
202            },
203        }
204    }
205}