1mod client;
68mod core;
69pub mod error;
70mod rpc_types;
71mod server;
72mod transport;
73
74pub type Bytes<'a> = &'a [u8];
75pub type OwnedBytes = Vec<u8>;
76
77pub use crate::client::call_client;
78pub use crate::client::RpcClient;
79pub use crate::core::Rpc;
80pub use crate::core::RpcImpl;
81pub use crate::core::RpcName;
82pub use crate::core::RpcType;
83pub use crate::core::StoredRpc;
84pub use crate::server::RpcServer;
85pub use crate::transport::InternalTransport;
86pub use crate::transport::Transport;
87pub use crate::transport::TransportConfig;
88pub use crate::transport::TransportWireConfig;
89
90#[cfg(feature = "macros")]
91pub use pirates_macro_lib::rpc_definition;
92
93pub trait RpcDefinition<Name: RpcName, State, Q: RpcType, R: RpcType> {
94 fn client() -> Rpc<Name, Q, R>;
95 fn server() -> RpcImpl<Name, State, Q, R>;
96}
97
98#[cfg(test)]
99mod tests {
100 use crate::client::call_client;
101 use crate::core::{Rpc, RpcImpl, RpcName};
102 use crate::error::RpcResult;
103 use crate::server::RpcServer;
104 use crate::transport::{TransportConfig, TransportWireConfig};
105 use crate::RpcDefinition;
106 use serde::{Deserialize, Serialize};
107 use std::fmt::{Display, Formatter};
108 use std::sync::{Arc, Mutex};
109 use std::time::Duration;
110
111 pub struct HelloWorldState {
112 pub i: usize,
113 }
114
115 #[derive(Clone, Hash, Eq, PartialEq, Debug, Serialize, Deserialize)]
116 pub enum HelloWorldRpcName {
117 HelloWorld,
118 GetI,
119 IncrI,
120 MassiveRpc,
121 PreciseRpc,
122 }
123 impl Display for HelloWorldRpcName {
124 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
125 write!(f, "{:?}", self)
126 }
127 }
128 impl RpcName for HelloWorldRpcName {}
129
130 pub fn make_hello_world_rpc() -> Rpc<HelloWorldRpcName, String, String> {
131 Rpc::new(HelloWorldRpcName::HelloWorld)
132 }
133 pub fn make_hello_world_rpc_impl() -> RpcImpl<HelloWorldRpcName, HelloWorldState, String, String>
134 {
135 RpcImpl::new(
136 HelloWorldRpcName::HelloWorld,
137 Box::new(|state, q| {
138 println!("Hello World RPC Got Called! Query: {:?}", q);
139 Ok(format!("Hello world: {}:{:?}", state.i, q))
140 }),
141 )
142 }
143
144 pub fn make_get_i_rpc() -> Rpc<HelloWorldRpcName, (), usize> {
145 Rpc::new(HelloWorldRpcName::GetI)
146 }
147 pub fn make_get_i_rpc_impl() -> RpcImpl<HelloWorldRpcName, HelloWorldState, (), usize> {
148 RpcImpl::new(
149 HelloWorldRpcName::GetI,
150 Box::new(|state, q| {
151 println!("GetI RPC Got Called! Query: {:?}", q);
152 Ok(state.i)
153 }),
154 )
155 }
156
157 pub struct IncrIRpc {}
158 impl IncrIRpc {
159 fn implement(state: &mut HelloWorldState, _query: ()) -> RpcResult<()> {
160 state.i += 1;
161 Ok(())
162 }
163 }
164 impl RpcDefinition<HelloWorldRpcName, HelloWorldState, (), ()> for IncrIRpc {
165 fn client() -> Rpc<HelloWorldRpcName, (), ()> {
166 Rpc::new(HelloWorldRpcName::IncrI)
167 }
168
169 fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, (), ()> {
170 RpcImpl::new(HelloWorldRpcName::IncrI, Box::new(Self::implement))
171 }
172 }
173
174 pub struct MassiveRpc {}
175 impl MassiveRpc {
176 fn implement(_state: &mut HelloWorldState, query: usize) -> RpcResult<Vec<u32>> {
177 let mut v = Vec::new();
178 let mut i = 0;
179 while i < query {
180 v.push(1u32);
181 i += 1;
182 }
183 Ok(v)
184 }
185 }
186 impl RpcDefinition<HelloWorldRpcName, HelloWorldState, usize, Vec<u32>> for MassiveRpc {
187 fn client() -> Rpc<HelloWorldRpcName, usize, Vec<u32>> {
188 Rpc::new(HelloWorldRpcName::MassiveRpc)
189 }
190
191 fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, usize, Vec<u32>> {
192 RpcImpl::new(HelloWorldRpcName::MassiveRpc, Box::new(Self::implement))
193 }
194 }
195
196 #[derive(Clone, Debug, Serialize, Deserialize)]
197 pub struct PrecisePayload {
198 bulk_bytes: Vec<u32>,
199 padding: Vec<bool>,
200 }
201 pub struct PreciseRpc {}
202 impl PreciseRpc {
203 fn implement(_state: &mut HelloWorldState, query: usize) -> RpcResult<PrecisePayload> {
204 let mut v = Vec::new();
205 let mut i = 0;
206 while i < query {
207 v.push(1u32);
208 i += 1;
209 }
210 let mut padding = Vec::new();
211 let mut i = 0;
212 while i < 128 + 7 {
213 padding.push(true);
214 i += 1;
215 }
216 Ok(PrecisePayload {
217 bulk_bytes: v,
218 padding,
219 })
220 }
221 }
222 impl RpcDefinition<HelloWorldRpcName, HelloWorldState, usize, PrecisePayload> for PreciseRpc {
223 fn client() -> Rpc<HelloWorldRpcName, usize, PrecisePayload> {
224 Rpc::new(HelloWorldRpcName::PreciseRpc)
225 }
226
227 fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, usize, PrecisePayload> {
228 RpcImpl::new(HelloWorldRpcName::PreciseRpc, Box::new(Self::implement))
229 }
230 }
231
232 #[test]
233 fn just_server_test() {
234 let state = HelloWorldState { i: 3 };
235 let transport_config = TransportConfig {
236 rcv_timeout: Duration::from_secs(3),
237 wire_config: TransportWireConfig::Pickle(
238 serde_pickle::DeOptions::new(),
239 serde_pickle::SerOptions::new(),
240 ),
241 };
242 let mut server = RpcServer::new(Arc::new(Mutex::new(state)), transport_config);
243 server.add_rpc(Box::new(make_hello_world_rpc_impl()));
244 println!("Full Test");
245 let incoming_bytes =
246 serde_pickle::ser::to_vec(&"Foo", serde_pickle::SerOptions::new()).unwrap();
247 server
248 .call(&incoming_bytes, &HelloWorldRpcName::HelloWorld)
249 .unwrap();
250 server
251 .call(&incoming_bytes, &HelloWorldRpcName::HelloWorld)
252 .unwrap();
253 }
254
255 #[tokio::test]
256 async fn regular_server() {
257 println!("Server Setup");
259 let state = HelloWorldState { i: 3 };
260 let state_ref = Arc::new(Mutex::new(state));
261 let transport_config = TransportConfig::default();
262 let mut server = RpcServer::new(state_ref, transport_config);
263 server.add_rpc(Box::new(make_hello_world_rpc_impl()));
264 server.add_rpc(Box::new(make_get_i_rpc_impl()));
265 server.add_rpc(Box::new(IncrIRpc::server()));
266 let addr = "127.0.0.1:5555";
267
268 let hello_world_rpc = make_hello_world_rpc();
269 let get_i_rpc = make_get_i_rpc();
270 let incr_i_rpc = IncrIRpc::client();
271
272 let mut rpc_results = None;
273 let mut client_call_task = tokio::spawn(async move {
274 let r1 = call_client(addr, "foo".into(), hello_world_rpc.clone())
275 .await
276 .unwrap();
277 let r2 = call_client(addr, (), get_i_rpc.clone()).await.unwrap();
278 call_client(addr, (), incr_i_rpc.clone()).await.unwrap();
279 let r3 = call_client(addr, (), get_i_rpc).await.unwrap();
280 call_client(addr, (), incr_i_rpc).await.unwrap();
281 let r4 = call_client(addr, "bar".into(), hello_world_rpc)
282 .await
283 .unwrap();
284 (r1, r2, r3, r4)
285 });
286
287 while rpc_results.is_none() {
288 println!(".");
289 tokio::select! {
290 _ = server.serve(addr) => {},
291 client_output = &mut client_call_task => {rpc_results = Some(client_output)},
292 }
293 }
294
295 let (hello_world_1, get_i_1, get_i_2, hello_world_2) = rpc_results.unwrap().unwrap();
296 let expecting: String = "Hello world: 3:\"foo\"".into();
297 assert_eq!(expecting, hello_world_1);
298 assert_eq!(3usize, get_i_1);
299 assert_eq!(4usize, get_i_2);
300 let expecting2: String = "Hello world: 5:\"bar\"".into();
301 assert_eq!(expecting2, hello_world_2);
302 }
303
304 #[tokio::test]
305 async fn big_rpc_server() {
306 println!("Server Setup");
308 let state = HelloWorldState { i: 3 };
309 let state_ref = Arc::new(Mutex::new(state));
310 let mut server = RpcServer::new(state_ref, TransportConfig::default());
311 server.add_rpc(Box::new(MassiveRpc::server()));
312 server.add_rpc(Box::new(PreciseRpc::server()));
313 let addr = "127.0.0.1:5556";
314
315 let massive_rpc_client = MassiveRpc::client();
316 let precise_rpc_client = PreciseRpc::client();
317
318 let num_bulk = 170;
319 let mut rpc_results = None;
320 let mut client_call_task = tokio::spawn(async move {
321 let result = call_client(addr, 2000, massive_rpc_client.clone())
322 .await
323 .unwrap();
324 let result2: PrecisePayload = call_client(addr, num_bulk, precise_rpc_client)
326 .await
327 .unwrap();
328 (result.len(), result2.bulk_bytes.len())
329 });
330
331 while rpc_results.is_none() {
332 println!(".");
333 tokio::select! {
334 _ = server.serve(addr) => {},
335 client_output = &mut client_call_task => {rpc_results = Some(client_output)},
336 }
337 }
338
339 let (massive_len, slightly_smaller_len) = rpc_results.unwrap().unwrap();
340 assert_eq!(massive_len, 2000);
341 assert_eq!(slightly_smaller_len, num_bulk);
344 }
346}