mod client;
mod core;
pub mod error;
mod rpc_types;
mod server;
mod transport;
pub type Bytes<'a> = &'a [u8];
pub type OwnedBytes = Vec<u8>;
pub use crate::client::call_client;
pub use crate::client::RpcClient;
pub use crate::core::Rpc;
pub use crate::core::RpcImpl;
pub use crate::core::RpcName;
pub use crate::core::RpcType;
pub use crate::core::StoredRpc;
pub use crate::server::RpcServer;
pub use crate::transport::InternalTransport;
pub use crate::transport::Transport;
pub use crate::transport::TransportConfig;
pub use crate::transport::TransportWireConfig;
#[cfg(feature = "macros")]
pub use pirates_macro_lib::rpc_definition;
pub trait RpcDefinition<Name: RpcName, State, Q: RpcType, R: RpcType> {
fn client() -> Rpc<Name, Q, R>;
fn server() -> RpcImpl<Name, State, Q, R>;
}
#[cfg(test)]
mod tests {
use crate::client::call_client;
use crate::core::{Rpc, RpcImpl, RpcName};
use crate::error::RpcResult;
use crate::server::RpcServer;
use crate::transport::{TransportConfig, TransportWireConfig};
use crate::RpcDefinition;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub struct HelloWorldState {
pub i: usize,
}
#[derive(Clone, Hash, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub enum HelloWorldRpcName {
HelloWorld,
GetI,
IncrI,
MassiveRpc,
PreciseRpc,
}
impl Display for HelloWorldRpcName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl RpcName for HelloWorldRpcName {}
pub fn make_hello_world_rpc() -> Rpc<HelloWorldRpcName, String, String> {
Rpc::new(HelloWorldRpcName::HelloWorld)
}
pub fn make_hello_world_rpc_impl() -> RpcImpl<HelloWorldRpcName, HelloWorldState, String, String>
{
RpcImpl::new(
HelloWorldRpcName::HelloWorld,
Box::new(|state, q| {
println!("Hello World RPC Got Called! Query: {:?}", q);
Ok(format!("Hello world: {}:{:?}", state.i, q))
}),
)
}
pub fn make_get_i_rpc() -> Rpc<HelloWorldRpcName, (), usize> {
Rpc::new(HelloWorldRpcName::GetI)
}
pub fn make_get_i_rpc_impl() -> RpcImpl<HelloWorldRpcName, HelloWorldState, (), usize> {
RpcImpl::new(
HelloWorldRpcName::GetI,
Box::new(|state, q| {
println!("GetI RPC Got Called! Query: {:?}", q);
Ok(state.i)
}),
)
}
pub struct IncrIRpc {}
impl IncrIRpc {
fn implement(state: &mut HelloWorldState, _query: ()) -> RpcResult<()> {
state.i += 1;
Ok(())
}
}
impl RpcDefinition<HelloWorldRpcName, HelloWorldState, (), ()> for IncrIRpc {
fn client() -> Rpc<HelloWorldRpcName, (), ()> {
Rpc::new(HelloWorldRpcName::IncrI)
}
fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, (), ()> {
RpcImpl::new(HelloWorldRpcName::IncrI, Box::new(Self::implement))
}
}
pub struct MassiveRpc {}
impl MassiveRpc {
fn implement(_state: &mut HelloWorldState, query: usize) -> RpcResult<Vec<u32>> {
let mut v = Vec::new();
let mut i = 0;
while i < query {
v.push(1u32);
i += 1;
}
Ok(v)
}
}
impl RpcDefinition<HelloWorldRpcName, HelloWorldState, usize, Vec<u32>> for MassiveRpc {
fn client() -> Rpc<HelloWorldRpcName, usize, Vec<u32>> {
Rpc::new(HelloWorldRpcName::MassiveRpc)
}
fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, usize, Vec<u32>> {
RpcImpl::new(HelloWorldRpcName::MassiveRpc, Box::new(Self::implement))
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PrecisePayload {
bulk_bytes: Vec<u32>,
padding: Vec<bool>,
}
pub struct PreciseRpc {}
impl PreciseRpc {
fn implement(_state: &mut HelloWorldState, query: usize) -> RpcResult<PrecisePayload> {
let mut v = Vec::new();
let mut i = 0;
while i < query {
v.push(1u32);
i += 1;
}
let mut padding = Vec::new();
let mut i = 0;
while i < 128 + 7 {
padding.push(true);
i += 1;
}
Ok(PrecisePayload {
bulk_bytes: v,
padding,
})
}
}
impl RpcDefinition<HelloWorldRpcName, HelloWorldState, usize, PrecisePayload> for PreciseRpc {
fn client() -> Rpc<HelloWorldRpcName, usize, PrecisePayload> {
Rpc::new(HelloWorldRpcName::PreciseRpc)
}
fn server() -> RpcImpl<HelloWorldRpcName, HelloWorldState, usize, PrecisePayload> {
RpcImpl::new(HelloWorldRpcName::PreciseRpc, Box::new(Self::implement))
}
}
#[test]
fn just_server_test() {
let state = HelloWorldState { i: 3 };
let transport_config = TransportConfig {
rcv_timeout: Duration::from_secs(3),
wire_config: TransportWireConfig::Pickle(
serde_pickle::DeOptions::new(),
serde_pickle::SerOptions::new(),
),
};
let mut server = RpcServer::new(Arc::new(Mutex::new(state)), transport_config);
server.add_rpc(Box::new(make_hello_world_rpc_impl()));
println!("Full Test");
let incoming_bytes =
serde_pickle::ser::to_vec(&"Foo", serde_pickle::SerOptions::new()).unwrap();
server
.call(&incoming_bytes, &HelloWorldRpcName::HelloWorld)
.unwrap();
server
.call(&incoming_bytes, &HelloWorldRpcName::HelloWorld)
.unwrap();
}
#[tokio::test]
async fn regular_server() {
println!("Server Setup");
let state = HelloWorldState { i: 3 };
let state_ref = Arc::new(Mutex::new(state));
let transport_config = TransportConfig::default();
let mut server = RpcServer::new(state_ref, transport_config);
server.add_rpc(Box::new(make_hello_world_rpc_impl()));
server.add_rpc(Box::new(make_get_i_rpc_impl()));
server.add_rpc(Box::new(IncrIRpc::server()));
let addr = "127.0.0.1:5555";
let hello_world_rpc = make_hello_world_rpc();
let get_i_rpc = make_get_i_rpc();
let incr_i_rpc = IncrIRpc::client();
let mut rpc_results = None;
let mut client_call_task = tokio::spawn(async move {
let r1 = call_client(addr, "foo".into(), hello_world_rpc.clone())
.await
.unwrap();
let r2 = call_client(addr, (), get_i_rpc.clone()).await.unwrap();
call_client(addr, (), incr_i_rpc.clone()).await.unwrap();
let r3 = call_client(addr, (), get_i_rpc).await.unwrap();
call_client(addr, (), incr_i_rpc).await.unwrap();
let r4 = call_client(addr, "bar".into(), hello_world_rpc)
.await
.unwrap();
(r1, r2, r3, r4)
});
while rpc_results.is_none() {
println!(".");
tokio::select! {
_ = server.serve(addr) => {},
client_output = &mut client_call_task => {rpc_results = Some(client_output)},
}
}
let (hello_world_1, get_i_1, get_i_2, hello_world_2) = rpc_results.unwrap().unwrap();
let expecting: String = "Hello world: 3:\"foo\"".into();
assert_eq!(expecting, hello_world_1);
assert_eq!(3usize, get_i_1);
assert_eq!(4usize, get_i_2);
let expecting2: String = "Hello world: 5:\"bar\"".into();
assert_eq!(expecting2, hello_world_2);
}
#[tokio::test]
async fn big_rpc_server() {
println!("Server Setup");
let state = HelloWorldState { i: 3 };
let state_ref = Arc::new(Mutex::new(state));
let mut server = RpcServer::new(state_ref, TransportConfig::default());
server.add_rpc(Box::new(MassiveRpc::server()));
server.add_rpc(Box::new(PreciseRpc::server()));
let addr = "127.0.0.1:5556";
let massive_rpc_client = MassiveRpc::client();
let precise_rpc_client = PreciseRpc::client();
let num_bulk = 170;
let mut rpc_results = None;
let mut client_call_task = tokio::spawn(async move {
let result = call_client(addr, 2000, massive_rpc_client.clone())
.await
.unwrap();
let result2: PrecisePayload = call_client(addr, num_bulk, precise_rpc_client)
.await
.unwrap();
(result.len(), result2.bulk_bytes.len())
});
while rpc_results.is_none() {
println!(".");
tokio::select! {
_ = server.serve(addr) => {},
client_output = &mut client_call_task => {rpc_results = Some(client_output)},
}
}
let (massive_len, slightly_smaller_len) = rpc_results.unwrap().unwrap();
assert_eq!(massive_len, 2000);
assert_eq!(slightly_smaller_len, num_bulk);
}
}