1#![warn(missing_docs)]
84#[macro_use]
85extern crate slog;
86#[macro_use]
87extern crate trackable;
88
89pub use error::{Error, ErrorKind};
90
91pub mod client {
92 pub use crate::client_service::{ClientService, ClientServiceBuilder, ClientServiceHandle};
95 pub use crate::client_side_handlers::Response;
96 pub use crate::rpc_client::{CallClient, CastClient, Options};
97}
98pub mod channel;
99pub mod metrics;
100pub mod server {
101 pub use crate::rpc_server::{Server, ServerBuilder};
104 pub use crate::server_side_handlers::{HandleCall, HandleCast, NoReply, Reply};
105}
106
107use crate::client::{CallClient, CastClient, ClientServiceHandle};
108
109mod client_service;
110mod client_side_channel;
111mod client_side_handlers;
112mod error;
113mod message;
114mod message_stream;
115mod packet;
116mod rpc_client;
117mod rpc_server;
118mod server_side_channel;
119mod server_side_handlers;
120
121pub type Result<T> = std::result::Result<T, Error>;
123
124#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
128pub struct ProcedureId(pub u32);
129impl std::fmt::Debug for ProcedureId {
130 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
131 write!(f, "ProcedureId(0x{:08x})", self.0)
132 }
133}
134
135pub trait Call: Sized + Send + Sync + 'static {
137 const ID: ProcedureId;
139
140 const NAME: &'static str;
144
145 type Req: Send + 'static;
147
148 type ReqEncoder: bytecodec::Encode<Item = Self::Req> + Send + 'static;
150
151 type ReqDecoder: bytecodec::Decode<Item = Self::Req> + Send + 'static;
153
154 type Res: Send + 'static;
156
157 type ResEncoder: bytecodec::Encode<Item = Self::Res> + Send + 'static;
159
160 type ResDecoder: bytecodec::Decode<Item = Self::Res> + Send + 'static;
162
163 #[allow(unused_variables)]
170 fn enable_async_request(request: &Self::Req) -> bool {
171 false
172 }
173
174 #[allow(unused_variables)]
181 fn enable_async_response(response: &Self::Res) -> bool {
182 false
183 }
184
185 fn client(service: &ClientServiceHandle) -> CallClient<Self>
187 where
188 Self::ReqEncoder: Default,
189 Self::ResDecoder: Default,
190 {
191 Self::client_with_codec(service, Default::default(), Default::default())
192 }
193
194 fn client_with_decoder(
196 service: &ClientServiceHandle,
197 decoder: Self::ResDecoder,
198 ) -> CallClient<Self>
199 where
200 Self::ReqEncoder: Default,
201 {
202 Self::client_with_codec(service, decoder, Default::default())
203 }
204
205 fn client_with_encoder(
207 service: &ClientServiceHandle,
208 encoder: Self::ReqEncoder,
209 ) -> CallClient<Self>
210 where
211 Self::ResDecoder: Default,
212 {
213 Self::client_with_codec(service, Default::default(), encoder)
214 }
215
216 fn client_with_codec(
218 service: &ClientServiceHandle,
219 decoder: Self::ResDecoder,
220 encoder: Self::ReqEncoder,
221 ) -> CallClient<Self> {
222 CallClient::new(service, decoder, encoder)
223 }
224}
225
226pub trait Cast: Sized + Sync + Send + 'static {
228 const ID: ProcedureId;
230
231 const NAME: &'static str;
235
236 type Notification: Send + 'static;
238
239 type Encoder: bytecodec::Encode<Item = Self::Notification> + Send + 'static;
241
242 type Decoder: bytecodec::Decode<Item = Self::Notification> + Send + 'static;
244
245 #[allow(unused_variables)]
252 fn enable_async(notification: &Self::Notification) -> bool {
253 false
254 }
255
256 fn client(service: &ClientServiceHandle) -> CastClient<Self>
258 where
259 Self::Encoder: Default,
260 {
261 Self::client_with_encoder(service, Default::default())
262 }
263
264 fn client_with_encoder(
266 service: &ClientServiceHandle,
267 encoder: Self::Encoder,
268 ) -> CastClient<Self> {
269 CastClient::new(service, encoder)
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use crate::client::ClientServiceBuilder;
276 use crate::server::{HandleCall, Reply, ServerBuilder};
277 use crate::{Call, ProcedureId};
278 use bytecodec::bytes::{BytesEncoder, RemainingBytesDecoder};
279 use fibers_global;
280 use futures::Future;
281 use trackable::result::TestResult;
282
283 struct EchoRpc;
285 impl Call for EchoRpc {
286 const ID: ProcedureId = ProcedureId(0);
287 const NAME: &'static str = "echo";
288
289 type Req = Vec<u8>;
290 type ReqEncoder = BytesEncoder<Vec<u8>>;
291 type ReqDecoder = RemainingBytesDecoder;
292
293 type Res = Vec<u8>;
294 type ResEncoder = BytesEncoder<Vec<u8>>;
295 type ResDecoder = RemainingBytesDecoder;
296
297 fn enable_async_request(x: &Self::Req) -> bool {
298 x == b"async"
299 }
300
301 fn enable_async_response(x: &Self::Res) -> bool {
302 x == b"async"
303 }
304 }
305
306 struct EchoHandler;
308 impl HandleCall<EchoRpc> for EchoHandler {
309 fn handle_call(&self, request: <EchoRpc as Call>::Req) -> Reply<EchoRpc> {
310 Reply::done(request)
311 }
312 }
313
314 #[test]
315 fn it_works() -> TestResult {
316 let mut builder = ServerBuilder::new("127.0.0.1:0".parse().unwrap());
318 builder.add_call_handler(EchoHandler);
319 let server = builder.finish(fibers_global::handle());
320 let (server, server_addr) = track!(fibers_global::execute(server.local_addr()))?;
321 fibers_global::spawn(server.map_err(|e| panic!("{}", e)));
322
323 let service = ClientServiceBuilder::new().finish(fibers_global::handle());
325 let service_handle = service.handle();
326 fibers_global::spawn(service.map_err(|e| panic!("{}", e)));
327
328 let request = Vec::from(&b"hello"[..]);
329 let response = EchoRpc::client(&service_handle).call(server_addr, request.clone());
330 let response = track_any_err!(fibers_global::execute(response))?;
331 assert_eq!(response, request);
332
333 let metrics = service_handle
334 .metrics()
335 .channels()
336 .as_map()
337 .load()
338 .get(&server_addr)
339 .cloned()
340 .unwrap();
341 assert_eq!(metrics.async_outgoing_messages(), 0);
342 assert_eq!(metrics.async_incoming_messages(), 0);
343 Ok(())
344 }
345
346 #[test]
347 fn large_message_works() -> TestResult {
348 let mut builder = ServerBuilder::new("127.0.0.1:0".parse().unwrap());
350 builder.add_call_handler(EchoHandler);
351 let server = builder.finish(fibers_global::handle());
352 let future = server.local_addr();
354 let (server, server_addr) = track!(fibers_global::execute(future))?;
355 fibers_global::spawn(server.map_err(|e| panic!("{}", e)));
356
357 let service = ClientServiceBuilder::new().finish(fibers_global::handle());
359 let service_handle = service.handle();
360 fibers_global::spawn(service.map_err(|e| panic!("{}", e)));
361
362 let request = vec![0; 10 * 1024 * 1024];
363 let response = EchoRpc::client(&service_handle).call(server_addr, request.clone());
364 let response = track!(fibers_global::execute(response))?;
365 assert_eq!(response, request);
366 Ok(())
367 }
368
369 #[test]
370 fn async_works() -> TestResult {
371 let mut builder = ServerBuilder::new("127.0.0.1:0".parse().unwrap());
373 builder.add_call_handler(EchoHandler);
374 let server = builder.finish(fibers_global::handle());
375 let (server, server_addr) = track!(fibers_global::execute(server.local_addr()))?;
376 fibers_global::spawn(server.map_err(|e| panic!("{}", e)));
377
378 let service = ClientServiceBuilder::new().finish(fibers_global::handle());
380 let service_handle = service.handle();
381 fibers_global::spawn(service.map_err(|e| panic!("{}", e)));
382
383 let request = Vec::from(&b"async"[..]);
384 let response = EchoRpc::client(&service_handle).call(server_addr, request.clone());
385 let response = track!(fibers_global::execute(response))?;
386 assert_eq!(response, request);
387
388 let metrics = service_handle
389 .metrics()
390 .channels()
391 .as_map()
392 .load()
393 .get(&server_addr)
394 .cloned()
395 .unwrap();
396 assert_eq!(metrics.async_outgoing_messages(), 1);
397 assert_eq!(metrics.async_incoming_messages(), 1);
398 Ok(())
399 }
400}