fibers_rpc/
lib.rs

1//! RPC library built on top of [fibers] crate.
2//!
3//!
4//! [fibers]: https://github.com/dwango/fibers-rs
5//!
6//! # Features
7//!
8//! - Asynchronous RPC server/client using [fibers] crate
9//! - Support two type of RPC:
10//!   - Request/response model
11//!   - Notification model
12//! - Strongly typed RPC using [bytecodec] crate
13//!   - You can treat arbitrarily Rust structures that support [serde] as RPC messages
14//!   - It is possible to handle huge structures as RPC messages without
15//!     compromising efficiency and real-time property by implementing your own encoder/decoder
16//! - Multiplexing multiple RPC messages in a single TCP stream
17//! - Prioritization between messages
18//! - Expose [Prometheus] metrics
19//!
20//! [fibers]: https://github.com/dwango/fibers-rs
21//! [bytecodec]: https://github.com/sile/bytecodec
22//! [serde]: https://crates.io/crates/serde
23//! [Prometheus]: https://prometheus.io/
24//!
25//! # Technical Details
26//!
27//! See [doc/].
28//!
29//! [doc/]: https://github.com/sile/fibers_rpc/tree/master/doc
30//!
31//! # Examples
32//!
33//! Simple echo RPC server:
34//!
35//! ```
36//! # fn main() -> trackable::result::MainResult {
37//! use bytecodec::bytes::{BytesEncoder, RemainingBytesDecoder};
38//! use fibers_rpc::{Call, ProcedureId};
39//! use fibers_rpc::client::ClientServiceBuilder;
40//! use fibers_rpc::server::{HandleCall, Reply, ServerBuilder};
41//! use futures::Future;
42//!
43//! // RPC definition
44//! struct EchoRpc;
45//! impl Call for EchoRpc {
46//!     const ID: ProcedureId = ProcedureId(0);
47//!     const NAME: &'static str = "echo";
48//!
49//!     type Req = Vec<u8>;
50//!     type ReqEncoder = BytesEncoder<Vec<u8>>;
51//!     type ReqDecoder = RemainingBytesDecoder;
52//!
53//!     type Res = Vec<u8>;
54//!     type ResEncoder = BytesEncoder<Vec<u8>>;
55//!     type ResDecoder = RemainingBytesDecoder;
56//! }
57//!
58//! // RPC server
59//! struct EchoHandler;
60//! impl HandleCall<EchoRpc> for EchoHandler {
61//!     fn handle_call(&self, request: <EchoRpc as Call>::Req) -> Reply<EchoRpc> {
62//!         Reply::done(request)
63//!     }
64//! }
65//! let server_addr = "127.0.0.1:1919".parse().unwrap();
66//! let mut builder = ServerBuilder::new(server_addr);
67//! builder.add_call_handler(EchoHandler);
68//! let server = builder.finish(fibers_global::handle());
69//! fibers_global::spawn(server.map_err(|e| panic!("{}", e)));
70//!
71//! // RPC client
72//! let service = ClientServiceBuilder::new().finish(fibers_global::handle());
73//! let service_handle = service.handle();
74//! fibers_global::spawn(service.map_err(|e| panic!("{}", e)));
75//!
76//! let request = Vec::from(&b"hello"[..]);
77//! let response = EchoRpc::client(&service_handle).call(server_addr, request.clone());
78//! let response = fibers_global::execute(response)?;
79//! assert_eq!(response, request);
80//! # Ok(())
81//! # }
82//! ```
83#![warn(missing_docs)]
84#[macro_use]
85extern crate slog;
86#[macro_use]
87extern crate trackable;
88
89pub use error::{Error, ErrorKind};
90
91pub mod client {
92    //! RPC client.
93
94    pub use crate::client_service::{ClientService, ClientServiceBuilder, ClientServiceHandle};
95    pub use crate::client_side_handlers::Response;
96    pub use crate::rpc_client::{CallClient, CastClient, Options};
97}
98pub mod channel;
99pub mod metrics;
100pub mod server {
101    //! RPC server.
102
103    pub use crate::rpc_server::{Server, ServerBuilder};
104    pub use crate::server_side_handlers::{HandleCall, HandleCast, NoReply, Reply};
105}
106
107use crate::client::{CallClient, CastClient, ClientServiceHandle};
108
109mod client_service;
110mod client_side_channel;
111mod client_side_handlers;
112mod error;
113mod message;
114mod message_stream;
115mod packet;
116mod rpc_client;
117mod rpc_server;
118mod server_side_channel;
119mod server_side_handlers;
120
121/// This crate specific `Result` type.
122pub type Result<T> = std::result::Result<T, Error>;
123
124/// The identifier of a procedure.
125///
126/// This must be unique among procedures registered in an RPC server.
127#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
128pub struct ProcedureId(pub u32);
129impl std::fmt::Debug for ProcedureId {
130    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
131        write!(f, "ProcedureId(0x{:08x})", self.0)
132    }
133}
134
135/// Request/response RPC.
136pub trait Call: Sized + Send + Sync + 'static {
137    /// The identifier of the procedure.
138    const ID: ProcedureId;
139
140    /// The name of the procedure.
141    ///
142    /// This is only used for debugging purpose.
143    const NAME: &'static str;
144
145    /// Request message.
146    type Req: Send + 'static;
147
148    /// Request message encoder.
149    type ReqEncoder: bytecodec::Encode<Item = Self::Req> + Send + 'static;
150
151    /// Request message decoder.
152    type ReqDecoder: bytecodec::Decode<Item = Self::Req> + Send + 'static;
153
154    /// Response message.
155    type Res: Send + 'static;
156
157    /// Response message encoder.
158    type ResEncoder: bytecodec::Encode<Item = Self::Res> + Send + 'static;
159
160    /// Response message decoder.
161    type ResDecoder: bytecodec::Decode<Item = Self::Res> + Send + 'static;
162
163    /// If it returns `true`, encoding/decoding request messages will be executed asynchronously.
164    ///
165    /// For large RPC messages, asynchronous encoding/decoding may improve real-time property
166    /// (especially if messages will be encoded/decoded by using `serde`).
167    ///
168    /// The default implementation always return `false`.
169    #[allow(unused_variables)]
170    fn enable_async_request(request: &Self::Req) -> bool {
171        false
172    }
173
174    /// If it returns `true`, encoding/decoding response messages will be executed asynchronously.
175    ///
176    /// For large RPC messages, asynchronous encoding/decoding may improve real-time property
177    /// (especially if messages will be encoded/decoded by using `serde`).
178    ///
179    /// The default implementation always return `false`.
180    #[allow(unused_variables)]
181    fn enable_async_response(response: &Self::Res) -> bool {
182        false
183    }
184
185    /// Makes a new RPC client.
186    fn client(service: &ClientServiceHandle) -> CallClient<Self>
187    where
188        Self::ReqEncoder: Default,
189        Self::ResDecoder: Default,
190    {
191        Self::client_with_codec(service, Default::default(), Default::default())
192    }
193
194    /// Makes a new RPC client with the given decoder maker.
195    fn client_with_decoder(
196        service: &ClientServiceHandle,
197        decoder: Self::ResDecoder,
198    ) -> CallClient<Self>
199    where
200        Self::ReqEncoder: Default,
201    {
202        Self::client_with_codec(service, decoder, Default::default())
203    }
204
205    /// Makes a new RPC client with the given encoder maker.
206    fn client_with_encoder(
207        service: &ClientServiceHandle,
208        encoder: Self::ReqEncoder,
209    ) -> CallClient<Self>
210    where
211        Self::ResDecoder: Default,
212    {
213        Self::client_with_codec(service, Default::default(), encoder)
214    }
215
216    /// Makes a new RPC client with the given decoder and encoder makers.
217    fn client_with_codec(
218        service: &ClientServiceHandle,
219        decoder: Self::ResDecoder,
220        encoder: Self::ReqEncoder,
221    ) -> CallClient<Self> {
222        CallClient::new(service, decoder, encoder)
223    }
224}
225
226/// Notification RPC.
227pub trait Cast: Sized + Sync + Send + 'static {
228    /// The identifier of the procedure.
229    const ID: ProcedureId;
230
231    /// The name of the procedure.
232    ///
233    /// This is only used for debugging purpose.
234    const NAME: &'static str;
235
236    /// Notification message.
237    type Notification: Send + 'static;
238
239    /// Notification message encoder.
240    type Encoder: bytecodec::Encode<Item = Self::Notification> + Send + 'static;
241
242    /// Notification message decoder.
243    type Decoder: bytecodec::Decode<Item = Self::Notification> + Send + 'static;
244
245    /// If it returns `true`, encoding/decoding notification messages will be executed asynchronously.
246    ///
247    /// For large RPC messages, asynchronous encoding/decoding may improve real-time property
248    /// (especially if messages will be encoded/decoded by using `serde`).
249    ///
250    /// The default implementation always return `false`.
251    #[allow(unused_variables)]
252    fn enable_async(notification: &Self::Notification) -> bool {
253        false
254    }
255
256    /// Makes a new RPC client.
257    fn client(service: &ClientServiceHandle) -> CastClient<Self>
258    where
259        Self::Encoder: Default,
260    {
261        Self::client_with_encoder(service, Default::default())
262    }
263
264    /// Makes a new RPC client with the given encoder maker.
265    fn client_with_encoder(
266        service: &ClientServiceHandle,
267        encoder: Self::Encoder,
268    ) -> CastClient<Self> {
269        CastClient::new(service, encoder)
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use crate::client::ClientServiceBuilder;
276    use crate::server::{HandleCall, Reply, ServerBuilder};
277    use crate::{Call, ProcedureId};
278    use bytecodec::bytes::{BytesEncoder, RemainingBytesDecoder};
279    use fibers_global;
280    use futures::Future;
281    use trackable::result::TestResult;
282
283    // RPC
284    struct EchoRpc;
285    impl Call for EchoRpc {
286        const ID: ProcedureId = ProcedureId(0);
287        const NAME: &'static str = "echo";
288
289        type Req = Vec<u8>;
290        type ReqEncoder = BytesEncoder<Vec<u8>>;
291        type ReqDecoder = RemainingBytesDecoder;
292
293        type Res = Vec<u8>;
294        type ResEncoder = BytesEncoder<Vec<u8>>;
295        type ResDecoder = RemainingBytesDecoder;
296
297        fn enable_async_request(x: &Self::Req) -> bool {
298            x == b"async"
299        }
300
301        fn enable_async_response(x: &Self::Res) -> bool {
302            x == b"async"
303        }
304    }
305
306    // Handler
307    struct EchoHandler;
308    impl HandleCall<EchoRpc> for EchoHandler {
309        fn handle_call(&self, request: <EchoRpc as Call>::Req) -> Reply<EchoRpc> {
310            Reply::done(request)
311        }
312    }
313
314    #[test]
315    fn it_works() -> TestResult {
316        // Server
317        let mut builder = ServerBuilder::new("127.0.0.1:0".parse().unwrap());
318        builder.add_call_handler(EchoHandler);
319        let server = builder.finish(fibers_global::handle());
320        let (server, server_addr) = track!(fibers_global::execute(server.local_addr()))?;
321        fibers_global::spawn(server.map_err(|e| panic!("{}", e)));
322
323        // Client
324        let service = ClientServiceBuilder::new().finish(fibers_global::handle());
325        let service_handle = service.handle();
326        fibers_global::spawn(service.map_err(|e| panic!("{}", e)));
327
328        let request = Vec::from(&b"hello"[..]);
329        let response = EchoRpc::client(&service_handle).call(server_addr, request.clone());
330        let response = track_any_err!(fibers_global::execute(response))?;
331        assert_eq!(response, request);
332
333        let metrics = service_handle
334            .metrics()
335            .channels()
336            .as_map()
337            .load()
338            .get(&server_addr)
339            .cloned()
340            .unwrap();
341        assert_eq!(metrics.async_outgoing_messages(), 0);
342        assert_eq!(metrics.async_incoming_messages(), 0);
343        Ok(())
344    }
345
346    #[test]
347    fn large_message_works() -> TestResult {
348        // Server
349        let mut builder = ServerBuilder::new("127.0.0.1:0".parse().unwrap());
350        builder.add_call_handler(EchoHandler);
351        let server = builder.finish(fibers_global::handle());
352        // let (server, server_addr) = track!(fibers_global::execute(server.local_addr()))?;
353        let future = server.local_addr();
354        let (server, server_addr) = track!(fibers_global::execute(future))?;
355        fibers_global::spawn(server.map_err(|e| panic!("{}", e)));
356
357        // Client
358        let service = ClientServiceBuilder::new().finish(fibers_global::handle());
359        let service_handle = service.handle();
360        fibers_global::spawn(service.map_err(|e| panic!("{}", e)));
361
362        let request = vec![0; 10 * 1024 * 1024];
363        let response = EchoRpc::client(&service_handle).call(server_addr, request.clone());
364        let response = track!(fibers_global::execute(response))?;
365        assert_eq!(response, request);
366        Ok(())
367    }
368
369    #[test]
370    fn async_works() -> TestResult {
371        // Server
372        let mut builder = ServerBuilder::new("127.0.0.1:0".parse().unwrap());
373        builder.add_call_handler(EchoHandler);
374        let server = builder.finish(fibers_global::handle());
375        let (server, server_addr) = track!(fibers_global::execute(server.local_addr()))?;
376        fibers_global::spawn(server.map_err(|e| panic!("{}", e)));
377
378        // Client
379        let service = ClientServiceBuilder::new().finish(fibers_global::handle());
380        let service_handle = service.handle();
381        fibers_global::spawn(service.map_err(|e| panic!("{}", e)));
382
383        let request = Vec::from(&b"async"[..]);
384        let response = EchoRpc::client(&service_handle).call(server_addr, request.clone());
385        let response = track!(fibers_global::execute(response))?;
386        assert_eq!(response, request);
387
388        let metrics = service_handle
389            .metrics()
390            .channels()
391            .as_map()
392            .load()
393            .get(&server_addr)
394            .cloned()
395            .unwrap();
396        assert_eq!(metrics.async_outgoing_messages(), 1);
397        assert_eq!(metrics.async_incoming_messages(), 1);
398        Ok(())
399    }
400}