Crate krpc

source ·
Expand description

krpc

This is a RPC library which include client and server, it has the following characteristics:

  1. support multily thread
  2. use asynchronous communication by tokio
  3. support unix and tcp socket
  4. support client subcribe and server publish
  5. easy to use

server example

use tokio::time;

#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> {
	let mut service = krpc::Server::new();
	service.bind(
		"test_notargs_and_notret",
		krpc::callback!(|| {
			println!("test_notargs_and_notret called");
		}),
	);
	service.bind(
		"test_notret",
		krpc::callback!(|s: String| {
			println!("str is {}", s);
		}),
	);
	// 若回调函数提前return需要加上`@with_return`
	service.bind(
		"test_notret2",
		krpc::callback!(@with_return |s: String| {
			if s.is_empty() {
				return;
			}
			println!("str is {}", s);
		}),
	);
	service.bind(
		"test_notargs",
		krpc::callback!(|| -> String { String::from("test_notargs called!") }),
	);
	service.bind(
		"test_has_args_and_ret",
		krpc::callback!(|a0: i32, a1: i16, a2: i8, a3: bool| -> String {
			format!("{} {} {} {}", a0, a1, a2, a3)
		}),
	);
	let clone_string = String::from("abcd");
	service.bind(
		"test_with_clone",
		krpc::callback!(krpc::clone!(clone_string), || -> String {
			clone_string
		}),
	);
	println!("clone string is {}", clone_string);
	let publisher = service.publisher();
	tokio::spawn(async move {
		let mut interval = time::interval(time::Duration::from_secs(1));
		loop {
			publisher.push("sub", ("hello", 100));
			interval.tick().await;
		}
	});
	println!("server start");
	service.run("/tmp/local/unix").await
}

client example1

use macro define! to define a new type with some methods, call or subcribe by it’s method

use krpc::*;
use std::sync::Arc;
use tokio::time;

struct MySub;
impl SubcribeCallback<(String, i32)> for MySub {
	fn callback(&mut self, data: (String, i32)) -> bool {
		assert_eq!(&data.0, "hello");
		assert_eq!(data.1, 100i32);
		false
	}
}

define!(unix, RPC,
	fn test_notargs_and_notret(),
	fn test_notret(s:&'static str),
	fn test_notargs()->String,
	fn test_has_args_and_ret(a:i32,b:i16,c:i8,d:bool)->String,
	sub sub1(topic:&'static str, f:impl FnMut((String,i32))),
	sub sub2(topic:&'static str, v:&mut MySub)
);

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
	let rpc = Arc::new(RPC::new("/tmp/local/unix").await?);
	let _ = rpc.heartbeat().await?; //检测服务端是否存在
	let _ = rpc.test_notargs_and_notret().await?;
	let _ = rpc.test_notret("hello client").await?;
	let ret = rpc.test_notargs().await?;
	assert_eq!(&ret, "test_notargs called!");
	let ret = rpc.test_has_args_and_ret(10000, 1000, 10, true).await?;
	assert_eq!(&ret, "10000 1000 10 true");
	let task = tokio::spawn(async move {
		let mut v = MySub;
		loop {
			let c1 = rpc.clone();
			let c2 = rpc.clone();
			tokio::select! {
				e = c1.sub1("sub", |(s, id)| {
					assert_eq!(&s, "hello");
					assert_eq!(id, 100i32);
					println!("xxx");
				}) => break e,
				Err(e) = c2.sub2("sub", &mut v) => break Err(e),
			}
		}
	});
	time::sleep(time::Duration::new(3, 0)).await;
	task.abort();
	Ok(())
}

client example2

use macro call! to call and subcribe! to subcribe directly

use krpc::*;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
	let _ = call!(unix, "/tmp/local/unix", test_notargs_and_notret()).await?;
	let _ = call!(unix, "/tmp/local/unix", test_notret("hello client")).await?;
	let ret = call!(unix, "/tmp/local/unix", test_notargs() -> String).await?;
	assert_eq!(&ret, "test_notargs called!");
	let ret = call!(unix, "/tmp/local/unix", test_has_args_and_ret(10000, 1000, 10, true) -> String).await?;
	assert_eq!(&ret, "10000 1000 10 true");
	//test subcribe with trait
	let mut sub = MySub;
	subcribe!(unix, "/tmp/local/unix", "sub", &mut sub).await?;
	let task = tokio::spawn(async move {
		//test subcribe with lambda
		subcribe!(unix, "/tmp/local/unix", "sub", |s: String, v: i32| {
			assert_eq!(&s, "hello");
			assert_eq!(v, 100i32);
		})
		.await
	});
	time::sleep(time::Duration::new(3, 0)).await;
	task.abort();
	Ok(())
}

Re-exports

Modules

Macros