datacake_rpc/client.rs
1use std::borrow::Cow;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use crate::handler::{Handler, RpcService, TryAsBody, TryIntoBody};
6use crate::net::{Channel, Status};
7use crate::request::{MessageMetadata, RequestContents};
8use crate::Body;
9
10/// A type alias for the returned data view of the RPC message reply.
11pub type MessageReply<Svc, Msg> =
12 <<Svc as Handler<Msg>>::Reply as RequestContents>::Content;
13
14/// A RPC client handle for a given service.
15///
16/// ```rust
17/// use rkyv::{Archive, Deserialize, Serialize};
18/// use datacake_rpc::{Handler, Request, RpcService, ServiceRegistry, Status, RpcClient, Channel};
19/// use std::net::SocketAddr;
20///
21/// #[repr(C)]
22/// #[derive(Serialize, Deserialize, Archive, PartialEq, Debug)]
23/// #[archive(compare(PartialEq), check_bytes)]
24/// #[archive_attr(derive(PartialEq, Debug))]
25/// pub struct MyMessage {
26/// name: String,
27/// age: u32,
28/// }
29///
30/// pub struct EchoService;
31///
32/// impl RpcService for EchoService {
33/// fn register_handlers(registry: &mut ServiceRegistry<Self>) {
34/// registry.add_handler::<MyMessage>();
35/// }
36/// }
37///
38/// #[datacake_rpc::async_trait]
39/// impl Handler<MyMessage> for EchoService {
40/// type Reply = MyMessage;
41///
42/// async fn on_message(&self, msg: Request<MyMessage>) -> Result<Self::Reply, Status> {
43/// Ok(msg.to_owned().unwrap())
44/// }
45/// }
46///
47/// # #[tokio::main]
48/// # async fn main() -> anyhow::Result<()> {
49/// # use datacake_rpc::Server;
50/// # let bind = "127.0.0.1:8000".parse::<SocketAddr>().unwrap();
51/// # let server = Server::listen(bind).await.unwrap();
52/// # server.add_service(EchoService);
53/// let connect = "127.0.0.1:8000".parse::<SocketAddr>()?;
54/// let client = Channel::connect(connect);
55///
56/// let rpc_client = RpcClient::<EchoService>::new(client);
57///
58/// let msg = MyMessage {
59/// name: "Bobby".to_string(),
60/// age: 12,
61/// };
62///
63/// let resp = rpc_client.send(&msg).await?;
64/// assert_eq!(resp, msg);
65/// # Ok(())
66/// # }
67/// ```
68pub struct RpcClient<Svc>
69where
70 Svc: RpcService,
71{
72 channel: Channel,
73 timeout: Option<Duration>,
74 _p: PhantomData<Svc>,
75}
76
77impl<Svc> Clone for RpcClient<Svc>
78where
79 Svc: RpcService,
80{
81 fn clone(&self) -> Self {
82 Self {
83 channel: self.channel.clone(),
84 timeout: self.timeout,
85 _p: PhantomData,
86 }
87 }
88}
89impl<Svc> RpcClient<Svc>
90where
91 Svc: RpcService,
92{
93 /// Creates a new RPC client which can handle a new service type.
94 ///
95 /// [RpcClient]'s are cheap to create and should be preferred over
96 /// locking or other synchronization primitives.
97 pub fn new(channel: Channel) -> Self {
98 Self {
99 channel,
100 timeout: None,
101 _p: PhantomData,
102 }
103 }
104
105 /// Sets a timeout of a given amount of time.
106 ///
107 /// If any requests exceed this amount of time `Status::timeout` is returned.
108 pub fn set_timeout(&mut self, timeout: Duration) {
109 self.timeout = Some(timeout);
110 }
111
112 /// Creates a new RPC client which can handle a new service type.
113 ///
114 /// [RpcClient]'s are cheap to create and should be preferred over
115 /// locking or other synchronization primitives.
116 pub fn new_client<Svc2>(&self) -> RpcClient<Svc2>
117 where
118 Svc2: RpcService,
119 {
120 RpcClient {
121 channel: self.channel.clone(),
122 timeout: None,
123 _p: PhantomData,
124 }
125 }
126
127 /// Sends a message to the server and wait for a reply.
128 ///
129 /// This lets you send messages behind a reference which can help
130 /// avoid excess copies when it isn't needed.
131 ///
132 /// In the event you need to send a [Body] or type which must consume `self`
133 /// you can use [Self::send_owned]
134 pub async fn send<Msg>(&self, msg: &Msg) -> Result<MessageReply<Svc, Msg>, Status>
135 where
136 Msg: RequestContents + TryAsBody,
137 Svc: Handler<Msg>,
138 // Due to some interesting compiler errors, we couldn't use GATs here to enforce
139 // this on the trait side, which is a shame.
140 <Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
141 {
142 let metadata = MessageMetadata {
143 service_name: Cow::Borrowed(<Svc as RpcService>::service_name()),
144 path: Cow::Borrowed(<Svc as Handler<Msg>>::path()),
145 };
146
147 let body = msg.try_as_body()?;
148 self.send_body(body, metadata).await
149 }
150
151 /// Sends a message to the server and wait for a reply using an owned
152 /// message value.
153 ///
154 /// This allows you to send types implementing [TryIntoBody] like [Body].
155 pub async fn send_owned<Msg>(
156 &self,
157 msg: Msg,
158 ) -> Result<MessageReply<Svc, Msg>, Status>
159 where
160 Msg: RequestContents + TryIntoBody,
161 Svc: Handler<Msg>,
162 // Due to some interesting compiler errors, we couldn't use GATs here to enforce
163 // this on the trait side, which is a shame.
164 <Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
165 {
166 let metadata = MessageMetadata {
167 service_name: Cow::Borrowed(<Svc as RpcService>::service_name()),
168 path: Cow::Borrowed(<Svc as Handler<Msg>>::path()),
169 };
170
171 let body = msg.try_into_body()?;
172 self.send_body(body, metadata).await
173 }
174
175 async fn send_body<Msg>(
176 &self,
177 body: Body,
178 metadata: MessageMetadata,
179 ) -> Result<MessageReply<Svc, Msg>, Status>
180 where
181 Msg: RequestContents,
182 Svc: Handler<Msg>,
183 // Due to some interesting compiler errors, we couldn't use GATs here to enforce
184 // this on the trait side, which is a shame.
185 <Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
186 {
187 let future = self.channel.send_msg(metadata, body);
188
189 let result = match self.timeout {
190 Some(duration) => tokio::time::timeout(duration, future)
191 .await
192 .map_err(|_| Status::timeout())?
193 .map_err(Status::connection)?,
194 None => future.await.map_err(Status::connection)?,
195 };
196
197 match result {
198 Ok(body) => <<Svc as Handler<Msg>>::Reply>::from_body(body).await,
199 Err(buffer) => {
200 let status = rkyv::from_bytes(&buffer).map_err(|_| Status::invalid())?;
201 Err(status)
202 },
203 }
204 }
205}