pirates/
lib.rs

1//! Pirates - a straightforward ArrrrPC library
2//!
3//! The core of things in the RPC definition itself.
4//! you achieve this by implementing `RpcDefinition` on a struct of your choice.
5//! the `#[pirates::rpc_definition]` macro can do this for you on an impl that
6//! contains a run and implement function (Enable the "macros" feature)
7//!
8//! ```rust,no_run
9//! # pub struct AddName {}
10//! # pub use pirates_macro_lib::rpc_definition;
11//! #[pirates::rpc_definition]
12//! impl AddName {
13//!     fn name() -> RpcId {
14//!         RpcId::AddName
15//!     }
16//!     fn implement(state: &mut ServerState, query: String) -> RpcResult<()> {
17//!         state.names.push(query);
18//!         Ok(())
19//!     }
20//! }
21//! ```
22//!
23//! There are two core types these are generic over which you need to define:
24//! 1) Rpc Identifier. Create a type which implements RpcName
25//! ```rust,no_run
26//! # use serde::{Serialize,Deserialize};
27//! # use std::fmt::Formatter;
28//! #[derive(PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
29//! enum RpcId {
30//!     AddName,
31//!     GetNames,
32//! }
33//! impl std::fmt::Display for RpcId {
34//!     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35//!         match self {
36//!             Self::AddName => write!(f, "AddName"),
37//!             Self::GetNames => write!(f, "GetNames"),
38//!         }
39//!     }
40//! }
41//! ```
42//! 2) Server state. Any type inside an Arc<Mutex<T> that the server can hand to RPCs
43//! ```rust,no_run
44//! struct ServerState {
45//!     names: Vec<String>,
46//! }
47//! ```
48//!
49//!
50//! When you have an rpc definition, you can now serve it.
51//! Serving is done by creating an `RpcServer` and awaiting its `serve` method
52//!
53//! ```rust,ignore
54//! let mut server = RpcServer::new(state.clone());
55//! server.add_rpc(Box::new(rpcs::AddName::server()));
56//! server.serve("127.0.0.1:5959").await;
57//! ```
58//!
59//!
60//! Elsewhere, to call it, use the `call_client` function with access to the RPC
61//! ```rust,ignore
62//! let addr = "127.0.0.1:5959";
63//! let name = String::from("Gaspode the wonder dog");
64//! pirates::call_client(addr, name, rpcs::AddName::client()).await;
65//! ```
66
67mod client;
68mod core;
69pub mod error;
70mod rpc_types;
71mod server;
72mod transport;
73
74pub type Bytes<'a> = &'a [u8];
75pub type OwnedBytes = Vec<u8>;
76
77pub use crate::client::call_client;
78pub use crate::client::RpcClient;
79pub use crate::core::Rpc;
80pub use crate::core::RpcImpl;
81pub use crate::core::RpcName;
82pub use crate::core::RpcType;
83pub use crate::core::StoredRpc;
84pub use crate::server::RpcServer;
85pub use crate::transport::InternalTransport;
86pub use crate::transport::Transport;
87pub use crate::transport::TransportConfig;
88pub use crate::transport::TransportWireConfig;
89
90#[cfg(feature = "macros")]
91pub use pirates_macro_lib::rpc_definition;
92
93pub trait RpcDefinition<Name: RpcName, State, Q: RpcType, R: RpcType> {
94    fn client() -> Rpc<Name, Q, R>;
95    fn server() -> RpcImpl<Name, State, Q, R>;
96}
97
98#[cfg(test)]
99mod tests {
100    use crate::client::call_client;
101    use crate::core::{Rpc, RpcImpl, RpcName};
102    use crate::error::RpcResult;
103    use crate::server::RpcServer;
104    use crate::transport::{TransportConfig, TransportWireConfig};
105    use crate::RpcDefinition;
106    use serde::{Deserialize, Serialize};
107    use std::fmt::{Display, Formatter};
108    use std::sync::{Arc, Mutex};
109    use std::time::Duration;
110
111    pub struct HelloWorldState {
112        pub i: usize,
113    }
114
115    #[derive(Clone, Hash, Eq, PartialEq, Debug, Serialize, Deserialize)]
116    pub enum HelloWorldRpcName {
117        HelloWorld,
118        GetI,
119        IncrI,
120        MassiveRpc,
121        PreciseRpc,
122    }
123    impl Display for HelloWorldRpcName {
124        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
125            write!(f, "{:?}", self)
126        }
127    }
128    impl RpcName for HelloWorldRpcName {}
129
130    pub fn make_hello_world_rpc() -> Rpc<HelloWorldRpcName, String, String> {
131        Rpc::new(HelloWorldRpcName::HelloWorld)
132    }
133    pub fn make_hello_world_rpc_impl() -> RpcImpl<HelloWorldRpcName, HelloWorldState, String, String>
134    {
135        RpcImpl::new(
136            HelloWorldRpcName::HelloWorld,
137            Box::new(|state, q| {
138                println!("Hello World RPC Got Called! Query: {:?}", q);
139                Ok(format!("Hello world: {}:{:?}", state.i, q))
140            }),
141        )
142    }
143
144    pub fn make_get_i_rpc() -> Rpc<HelloWorldRpcName, (), usize> {
145        Rpc::new(HelloWorldRpcName::GetI)
146    }
147    pub fn make_get_i_rpc_impl() -> RpcImpl<HelloWorldRpcName, HelloWorldState, (), usize> {
148        RpcImpl::new(
149            HelloWorldRpcName::GetI,
150            Box::new(|state, q| {
151                println!("GetI RPC Got Called! Query: {:?}", q);
152                Ok(state.i)
153            }),
154        )
155    }
156
157    pub struct IncrIRpc {}
158    impl IncrIRpc {
159        fn implement(state: &mut HelloWorldState, _query: ()) -> RpcResult<()> {
160            state.i += 1;
161            Ok(())
162        }
163    }
164    impl RpcDefinition<HelloWorldRpcName, HelloWorldState, (), ()> for IncrIRpc {
165        fn client() -> Rpc<HelloWorldRpcName, (), ()> {
166            Rpc::new(HelloWorldRpcName::IncrI)
167        }
168
169        fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, (), ()> {
170            RpcImpl::new(HelloWorldRpcName::IncrI, Box::new(Self::implement))
171        }
172    }
173
174    pub struct MassiveRpc {}
175    impl MassiveRpc {
176        fn implement(_state: &mut HelloWorldState, query: usize) -> RpcResult<Vec<u32>> {
177            let mut v = Vec::new();
178            let mut i = 0;
179            while i < query {
180                v.push(1u32);
181                i += 1;
182            }
183            Ok(v)
184        }
185    }
186    impl RpcDefinition<HelloWorldRpcName, HelloWorldState, usize, Vec<u32>> for MassiveRpc {
187        fn client() -> Rpc<HelloWorldRpcName, usize, Vec<u32>> {
188            Rpc::new(HelloWorldRpcName::MassiveRpc)
189        }
190
191        fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, usize, Vec<u32>> {
192            RpcImpl::new(HelloWorldRpcName::MassiveRpc, Box::new(Self::implement))
193        }
194    }
195
196    #[derive(Clone, Debug, Serialize, Deserialize)]
197    pub struct PrecisePayload {
198        bulk_bytes: Vec<u32>,
199        padding: Vec<bool>,
200    }
201    pub struct PreciseRpc {}
202    impl PreciseRpc {
203        fn implement(_state: &mut HelloWorldState, query: usize) -> RpcResult<PrecisePayload> {
204            let mut v = Vec::new();
205            let mut i = 0;
206            while i < query {
207                v.push(1u32);
208                i += 1;
209            }
210            let mut padding = Vec::new();
211            let mut i = 0;
212            while i < 128 + 7 {
213                padding.push(true);
214                i += 1;
215            }
216            Ok(PrecisePayload {
217                bulk_bytes: v,
218                padding,
219            })
220        }
221    }
222    impl RpcDefinition<HelloWorldRpcName, HelloWorldState, usize, PrecisePayload> for PreciseRpc {
223        fn client() -> Rpc<HelloWorldRpcName, usize, PrecisePayload> {
224            Rpc::new(HelloWorldRpcName::PreciseRpc)
225        }
226
227        fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, usize, PrecisePayload> {
228            RpcImpl::new(HelloWorldRpcName::PreciseRpc, Box::new(Self::implement))
229        }
230    }
231
232    #[test]
233    fn just_server_test() {
234        let state = HelloWorldState { i: 3 };
235        let transport_config = TransportConfig {
236            rcv_timeout: Duration::from_secs(3),
237            wire_config: TransportWireConfig::Pickle(
238                serde_pickle::DeOptions::new(),
239                serde_pickle::SerOptions::new(),
240            ),
241        };
242        let mut server = RpcServer::new(Arc::new(Mutex::new(state)), transport_config);
243        server.add_rpc(Box::new(make_hello_world_rpc_impl()));
244        println!("Full Test");
245        let incoming_bytes =
246            serde_pickle::ser::to_vec(&"Foo", serde_pickle::SerOptions::new()).unwrap();
247        server
248            .call(&incoming_bytes, &HelloWorldRpcName::HelloWorld)
249            .unwrap();
250        server
251            .call(&incoming_bytes, &HelloWorldRpcName::HelloWorld)
252            .unwrap();
253    }
254
255    #[tokio::test]
256    async fn regular_server() {
257        // Server setup
258        println!("Server Setup");
259        let state = HelloWorldState { i: 3 };
260        let state_ref = Arc::new(Mutex::new(state));
261        let transport_config = TransportConfig::default();
262        let mut server = RpcServer::new(state_ref, transport_config);
263        server.add_rpc(Box::new(make_hello_world_rpc_impl()));
264        server.add_rpc(Box::new(make_get_i_rpc_impl()));
265        server.add_rpc(Box::new(IncrIRpc::server()));
266        let addr = "127.0.0.1:5555";
267
268        let hello_world_rpc = make_hello_world_rpc();
269        let get_i_rpc = make_get_i_rpc();
270        let incr_i_rpc = IncrIRpc::client();
271
272        let mut rpc_results = None;
273        let mut client_call_task = tokio::spawn(async move {
274            let r1 = call_client(addr, "foo".into(), hello_world_rpc.clone())
275                .await
276                .unwrap();
277            let r2 = call_client(addr, (), get_i_rpc.clone()).await.unwrap();
278            call_client(addr, (), incr_i_rpc.clone()).await.unwrap();
279            let r3 = call_client(addr, (), get_i_rpc).await.unwrap();
280            call_client(addr, (), incr_i_rpc).await.unwrap();
281            let r4 = call_client(addr, "bar".into(), hello_world_rpc)
282                .await
283                .unwrap();
284            (r1, r2, r3, r4)
285        });
286
287        while rpc_results.is_none() {
288            println!(".");
289            tokio::select! {
290                _ = server.serve(addr) => {},
291                client_output = &mut client_call_task => {rpc_results = Some(client_output)},
292            }
293        }
294
295        let (hello_world_1, get_i_1, get_i_2, hello_world_2) = rpc_results.unwrap().unwrap();
296        let expecting: String = "Hello world: 3:\"foo\"".into();
297        assert_eq!(expecting, hello_world_1);
298        assert_eq!(3usize, get_i_1);
299        assert_eq!(4usize, get_i_2);
300        let expecting2: String = "Hello world: 5:\"bar\"".into();
301        assert_eq!(expecting2, hello_world_2);
302    }
303
304    #[tokio::test]
305    async fn big_rpc_server() {
306        // Server setup
307        println!("Server Setup");
308        let state = HelloWorldState { i: 3 };
309        let state_ref = Arc::new(Mutex::new(state));
310        let mut server = RpcServer::new(state_ref, TransportConfig::default());
311        server.add_rpc(Box::new(MassiveRpc::server()));
312        server.add_rpc(Box::new(PreciseRpc::server()));
313        let addr = "127.0.0.1:5556";
314
315        let massive_rpc_client = MassiveRpc::client();
316        let precise_rpc_client = PreciseRpc::client();
317
318        let num_bulk = 170;
319        let mut rpc_results = None;
320        let mut client_call_task = tokio::spawn(async move {
321            let result = call_client(addr, 2000, massive_rpc_client.clone())
322                .await
323                .unwrap();
324            //
325            let result2: PrecisePayload = call_client(addr, num_bulk, precise_rpc_client)
326                .await
327                .unwrap();
328            (result.len(), result2.bulk_bytes.len())
329        });
330
331        while rpc_results.is_none() {
332            println!(".");
333            tokio::select! {
334                _ = server.serve(addr) => {},
335                client_output = &mut client_call_task => {rpc_results = Some(client_output)},
336            }
337        }
338
339        let (massive_len, slightly_smaller_len) = rpc_results.unwrap().unwrap();
340        assert_eq!(massive_len, 2000);
341        // which returns 10010 bytes = 8000 bytes + 2010 overhead?
342
343        assert_eq!(slightly_smaller_len, num_bulk);
344        // which returns 1286 bytes = 1024 + 262 overhead
345    }
346}