Expand description
krpc
This is a RPC library which include client and server, it has the following characteristics:
- support multily thread
- use asynchronous communication by tokio
- support unix and tcp socket
- support client subcribe and server publish
- easy to use
server example
use tokio::time;
#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> {
let mut service = krpc::Server::new();
service.bind(
"test_notargs_and_notret",
krpc::callback!(|| {
println!("test_notargs_and_notret called");
}),
);
service.bind(
"test_notret",
krpc::callback!(|s: String| {
println!("str is {}", s);
}),
);
// 若回调函数提前return需要加上`@with_return`
service.bind(
"test_notret2",
krpc::callback!(@with_return |s: String| {
if s.is_empty() {
return;
}
println!("str is {}", s);
}),
);
service.bind(
"test_notargs",
krpc::callback!(|| -> String { String::from("test_notargs called!") }),
);
service.bind(
"test_has_args_and_ret",
krpc::callback!(|a0: i32, a1: i16, a2: i8, a3: bool| -> String {
format!("{} {} {} {}", a0, a1, a2, a3)
}),
);
let clone_string = String::from("abcd");
service.bind(
"test_with_clone",
krpc::callback!(krpc::clone!(clone_string), || -> String {
clone_string
}),
);
println!("clone string is {}", clone_string);
let publisher = service.publisher();
tokio::spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(1));
loop {
publisher.push("sub", ("hello", 100));
interval.tick().await;
}
});
println!("server start");
service.run("/tmp/local/unix").await
}
client example1
use macro define!
to define a new type with some methods, call or subcribe by it’s method
use krpc::*;
use std::sync::Arc;
use tokio::time;
struct MySub;
impl SubcribeCallback<(String, i32)> for MySub {
fn callback(&mut self, data: (String, i32)) -> bool {
assert_eq!(&data.0, "hello");
assert_eq!(data.1, 100i32);
false
}
}
define!(unix, RPC,
fn test_notargs_and_notret(),
fn test_notret(s:&'static str),
fn test_notargs()->String,
fn test_has_args_and_ret(a:i32,b:i16,c:i8,d:bool)->String,
sub sub1(topic:&'static str, f:impl FnMut((String,i32))),
sub sub2(topic:&'static str, v:&mut MySub)
);
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
let rpc = Arc::new(RPC::new("/tmp/local/unix").await?);
let _ = rpc.heartbeat().await?; //检测服务端是否存在
let _ = rpc.test_notargs_and_notret().await?;
let _ = rpc.test_notret("hello client").await?;
let ret = rpc.test_notargs().await?;
assert_eq!(&ret, "test_notargs called!");
let ret = rpc.test_has_args_and_ret(10000, 1000, 10, true).await?;
assert_eq!(&ret, "10000 1000 10 true");
let task = tokio::spawn(async move {
let mut v = MySub;
loop {
let c1 = rpc.clone();
let c2 = rpc.clone();
tokio::select! {
e = c1.sub1("sub", |(s, id)| {
assert_eq!(&s, "hello");
assert_eq!(id, 100i32);
println!("xxx");
}) => break e,
Err(e) = c2.sub2("sub", &mut v) => break Err(e),
}
}
});
time::sleep(time::Duration::new(3, 0)).await;
task.abort();
Ok(())
}
client example2
use macro call!
to call and subcribe!
to subcribe directly
use krpc::*;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
let _ = call!(unix, "/tmp/local/unix", test_notargs_and_notret()).await?;
let _ = call!(unix, "/tmp/local/unix", test_notret("hello client")).await?;
let ret = call!(unix, "/tmp/local/unix", test_notargs() -> String).await?;
assert_eq!(&ret, "test_notargs called!");
let ret = call!(unix, "/tmp/local/unix", test_has_args_and_ret(10000, 1000, 10, true) -> String).await?;
assert_eq!(&ret, "10000 1000 10 true");
//test subcribe with trait
let mut sub = MySub;
subcribe!(unix, "/tmp/local/unix", "sub", &mut sub).await?;
let task = tokio::spawn(async move {
//test subcribe with lambda
subcribe!(unix, "/tmp/local/unix", "sub", |s: String, v: i32| {
assert_eq!(&s, "hello");
assert_eq!(v, 100i32);
})
.await
});
time::sleep(time::Duration::new(3, 0)).await;
task.abort();
Ok(())
}
Re-exports
Modules
- RPC客户端
- RPC通信消息
- RPC服务端
Macros
- RPC调用
- 用以声明回调函数,主要用于服务端绑定lambda函数(支持同步和异步)
- 定义RPC Client的新类型
- 用以定义新类型
- 订阅主题