krpc/
client.rs

1use super::msg::*;
2use std::collections::HashMap;
3use std::future::Future;
4pub use std::ops::Deref;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU32, Ordering};
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::net::TcpStream;
9#[cfg(unix)]
10use tokio::net::UnixStream;
11use tokio::sync::{mpsc, oneshot};
12
13/// 异步trait的返回类型
14type Return<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
15/// 异步trait主要用以发送RPC请求并接收服务端回复
16pub trait Call {
17	fn call<'a>(&'a self, id: u32, data: Bytes) -> Return<'a, Result<Msg, Error>>;
18}
19
20/// 用以订阅回调处理
21pub trait SubcribeCallback<T> {
22	/// 返回值为true表示一直订阅,返回false表示只订阅一次
23	fn callback(&mut self, data: T) -> bool;
24}
25
26pub struct MyStream {
27	tx: mpsc::Sender<(u32, oneshot::Sender<Result<Msg, Error>>, Bytes)>,
28}
29
30impl MyStream {
31	/// 创建多线程流
32	fn new<Stream: AsyncReadExt + AsyncWriteExt + std::marker::Unpin + Send + 'static>(mut stream: Stream) -> Self {
33		let (tx, mut rx) = mpsc::channel::<(u32, oneshot::Sender<Result<Msg, Error>>, Bytes)>(1);
34		tokio::spawn(async move {
35			let mut header = [0u8; RPC_HEADER_LEN];
36			let mut callers = HashMap::<u32, oneshot::Sender<Result<Msg, Error>>>::new();
37			let error = loop {
38				tokio::select! {
39					Some((id, otx, data)) = rx.recv() => {
40						callers.insert(id, otx);
41						if let Err(err) = stream.write_all(&data[..]).await {
42							break Error::from(err);
43						}
44					}
45					ret = stream.read_exact(&mut header[..]) => {
46						match ret {
47							Ok(0) => break Error::new("对端已关闭读取数据长度为0"),
48							Ok(_) => {
49								if let Ok(mut msg) = Msg::decode(&header[..]) {
50									match msg.mode() {
51										Mode::Respond | Mode::Publish => {
52											if let Some(buf) = msg.body() {
53												//读取消息体
54												if let Err(err) = stream.read_exact(buf).await {
55													break Error::from(err);
56												}
57											}
58										}
59										_ => (),
60									}
61									if let Some(otx) = callers.remove(&msg.id()) {
62										let _ = otx.send(Ok(msg));
63									}
64								}
65							}
66							Err(err) => break Error::from(err),
67						}
68					}
69				}
70			};
71			//通知所有调用者出错了
72			callers.into_iter().for_each(|(_, otx)| {
73				let _ = otx.send(Err(error.clone()));
74			});
75		});
76		Self { tx }
77	}
78}
79/// 为多线程流实现Call trait
80impl Call for MyStream {
81	fn call<'a>(&'a self, id: u32, data: Bytes) -> Return<'a, Result<Msg, Error>> {
82		Box::pin(async move {
83			let (tx, rx) = oneshot::channel();
84			let _ = self.tx.send((id, tx, data)).await;
85			rx.await.unwrap()
86		})
87	}
88}
89
90pub struct Client<Stream: Call + Send> {
91	stream: Stream,
92	id: AtomicU32,
93}
94
95const HEARTBEAT: &'static str = "heartbeat";
96
97impl<Stream: Call + Send> Client<Stream> {
98	/// 创建Client
99	pub fn new(stream: Stream) -> Self {
100		Self { stream, id: AtomicU32::new(0) }
101	}
102	/// 编码heartbeat请求
103	fn encode_heartbeat(&self) -> (u32, Bytes) {
104		let id = self.id.fetch_add(1, Ordering::SeqCst);
105		let msg = Msg::new(id, HEARTBEAT);
106		let data = msg.encode_without_body(Mode::HeartBeat);
107		(id, data)
108	}
109	/// 编码无参数请求
110	fn encode_without_arg(&self, name: &str) -> (u32, Bytes) {
111		let id = self.id.fetch_add(1, Ordering::SeqCst);
112		let msg = Msg::new(id, name);
113		let data = msg.encode_without_body(Mode::Request);
114		(id, data)
115	}
116	/// 编码有参数请求
117	fn encode_with_arg<Args>(&self, name: &str, args: Args) -> (u32, Bytes)
118	where
119		Args: serde::ser::Serialize,
120	{
121		let id = self.id.fetch_add(1, Ordering::SeqCst);
122		let msg = Msg::new(id, name);
123		let data = msg.encode(Mode::Request, &args);
124		(id, data)
125	}
126	/// 解码heartbeat回复
127	fn decode_heartbeat(msg: Msg) -> Result<(), Error> {
128		if msg.name() != HEARTBEAT.as_bytes() {
129			return Err(Error::new("心跳回复函数名称不匹配"));
130		}
131		if !msg.headeronly() {
132			return Err(Error::new("心跳不应当返回消息体"));
133		}
134		match msg.mode() {
135			Mode::HeartBeat => Ok(()),
136			_ => Err(Error::new("返回消息模式不正确")),
137		}
138	}
139	/// 解码无返回值回复
140	fn decode_without_ret(msg: Msg, name: &str) -> Result<(), Error> {
141		if msg.name() != name.as_bytes() {
142			return Err(Error::new("回复函数名称不匹配"));
143		}
144		if !msg.headeronly() {
145			return Err(Error::new("不应当返回消息体"));
146		}
147		match msg.mode() {
148			Mode::Respond => Ok(()),
149			Mode::NotFound => Err(Error::new("没有找到相应的函数")),
150			Mode::NotMatch => Err(Error::new("函数参数不匹配")),
151			Mode::NoAccess => Err(Error::new("没有权限")),
152			_ => Err(Error::new("返回消息模式不正确")),
153		}
154	}
155	/// 解码有返回值回复
156	fn decode_with_ret<Ret>(msg: Msg, name: &str) -> Result<Ret, Error>
157	where
158		Ret: for<'a> serde::de::Deserialize<'a>,
159	{
160		if msg.name() != name.as_bytes() {
161			return Err(Error::new("回复函数名称不匹配"));
162		}
163		match msg.mode() {
164			Mode::Respond => {
165				if msg.headeronly() {
166					Err(Error::new("没有消息体"))
167				} else {
168					msg.parse()
169				}
170			}
171			Mode::NotFound => Err(Error::new("没有找到相应的函数")),
172			Mode::NotMatch => Err(Error::new("函数参数不匹配")),
173			Mode::NoAccess => Err(Error::new("没有权限")),
174			_ => Err(Error::new("返回消息模式不正确")),
175		}
176	}
177
178	/// 心跳检测
179	#[inline]
180	pub async fn heartbeat(&self) -> Result<(), Error> {
181		let (id, data) = self.encode_heartbeat();
182		let msg = self.stream.call(id, data).await?;
183		Self::decode_heartbeat(msg)
184	}
185	/// 无参数无返回值的函数调用
186	#[inline]
187	pub async fn call_without_arg_ret(&self, name: &str) -> Result<(), Error> {
188		let (id, data) = self.encode_without_arg(name);
189		let msg = self.stream.call(id, data).await?;
190		Self::decode_without_ret(msg, name)
191	}
192	/// 无参数有返回值的函数调用
193	#[inline]
194	pub async fn call_with_ret<Ret>(&self, name: &str) -> Result<Ret, Error>
195	where
196		Ret: for<'a> serde::de::Deserialize<'a>,
197	{
198		let (id, data) = self.encode_without_arg(name);
199		let msg = self.stream.call(id, data).await?;
200		Self::decode_with_ret(msg, name)
201	}
202	/// 有参数无返回值的函数调用
203	#[inline]
204	pub async fn call_with_arg<Args>(&self, name: &str, args: Args) -> Result<(), Error>
205	where
206		Args: serde::ser::Serialize,
207	{
208		let (id, data) = self.encode_with_arg(name, args);
209		let msg = self.stream.call(id, data).await?;
210		Self::decode_without_ret(msg, name)
211	}
212	/// 有参数也有返回值的函数调用
213	#[inline]
214	pub async fn call_with_arg_ret<Args, Ret>(&self, name: &str, args: Args) -> Result<Ret, Error>
215	where
216		Args: serde::ser::Serialize,
217		Ret: for<'a> serde::de::Deserialize<'a>,
218	{
219		let (id, data) = self.encode_with_arg(name, args);
220		let msg = self.stream.call(id, data).await?;
221		Self::decode_with_ret(msg, name)
222	}
223	/// 订阅主题,通过lambda函数处理订阅数据
224	pub async fn subcribe_with_lambda<Ret, F>(&self, topic: &str, mut f: F) -> Result<(), Error>
225	where
226		Ret: for<'a> serde::de::Deserialize<'a>,
227		F: FnMut(Ret),
228	{
229		loop {
230			let id = self.id.fetch_add(1, Ordering::SeqCst);
231			let msg = Msg::new(id, topic);
232			let data = msg.encode_without_body(Mode::Subcribe);
233			let msg = self.stream.call(id, data).await?;
234			if msg.name() != topic.as_bytes() {
235				break Err(Error::new("订阅主题名称不匹配"));
236			}
237			match msg.mode() {
238				Mode::Publish => {
239					if msg.headeronly() {
240						break Err(Error::new("没有订阅到消息体"));
241					} else {
242						f(msg.parse()?);
243						continue;
244					}
245				}
246				Mode::NotFound => break Err(Error::new("没有找到相应的函数")),
247				Mode::NotMatch => break Err(Error::new("函数参数不匹配")),
248				Mode::NoAccess => break Err(Error::new("没有权限")),
249				_ => break Err(Error::new("返回消息模式不正确")),
250			}
251		}
252	}
253	/// 订阅主题,通过trait处理订阅数据
254	pub async fn subcribe_with_trait<Ret, T>(&self, topic: &str, t: &mut T) -> Result<(), Error>
255	where
256		Ret: for<'a> serde::de::Deserialize<'a>,
257		T: SubcribeCallback<Ret>,
258	{
259		loop {
260			let id = self.id.fetch_add(1, Ordering::SeqCst);
261			let msg = Msg::new(id, topic);
262			let data = msg.encode_without_body(Mode::Subcribe);
263			let msg = self.stream.call(id, data).await?;
264			if msg.name() != topic.as_bytes() {
265				break Err(Error::new("订阅主题名称不匹配"));
266			}
267			match msg.mode() {
268				Mode::Publish => {
269					if msg.headeronly() {
270						break Err(Error::new("没有订阅到消息体"));
271					} else {
272						if t.callback(msg.parse()?) {
273							continue; //callback返回true说明需要一直订阅
274						} else {
275							break Ok(()); //callback返回false说明只订阅一次直接退出loop
276						}
277					}
278				}
279				Mode::NotFound => break Err(Error::new("没有找到相应的函数")),
280				Mode::NotMatch => break Err(Error::new("函数参数不匹配")),
281				Mode::NoAccess => break Err(Error::new("没有权限")),
282				_ => break Err(Error::new("返回消息模式不正确")),
283			}
284		}
285	}
286}
287
288/// TCP client
289pub type TCPClient = Client<MyStream>;
290/// unix client
291#[cfg(unix)]
292pub type UnixClient = Client<MyStream>;
293
294/// 创建TCP客户端
295#[inline]
296pub async fn new_tcp_client(addr: &str) -> std::io::Result<TCPClient> {
297	Ok(Client::new(MyStream::new(TcpStream::connect(addr).await?)))
298}
299/// unix客户端
300#[inline]
301#[cfg(unix)]
302pub async fn new_unix_client(path: &str) -> std::io::Result<UnixClient> {
303	Ok(Client::new(MyStream::new(UnixStream::connect(path).await?)))
304}
305
306/// # RPC调用
307/// - 第一个参数表示使用的传输协议,可选值**tcp**:使用tcp通信, **unix**:使用unix域套接字通信)
308/// - 第二个参数为通信地址,类型为<font color=blue>&str</font>,例如:`"127.0.0.1:9000"`,`"/tmp/local/unix"`
309/// - 剩下的参数为函数调用,格式如下所示:
310/// 	- `rpc1()`,无参数无返回值
311/// 	- `rpc2(id:i32)`,有参数无返回值
312/// 	- `rpc3()->i32`,无参数有返回值
313/// 	- `rpc4(a:i32,b:bool)->String`,既有参数也有返回值
314/// - 返回值类型为**Result<R,Error>**,R为返回值类型,当无返回值时R为()
315/// # example
316/// ```
317/// let _ = call!(unix, "/tmp/local/unix").await?;	//心跳检测
318/// let _ = call!(unix, "/tmp/local/unix", test_notargs_and_notret()).await?;
319/// let _ = call!(unix, "/tmp/local/unix", test_notret("hello client")).await?;
320/// let ret = call!(tcp, "127.0.0.1:9000", test_notargs() -> String).await?;
321/// assert_eq!(&ret, "test_notargs called!");
322/// let ret = call!(tcp, "127.0.0.1:9000", test_has_args_and_ret(10000, 1000, 10, true) -> String).await?;
323/// assert_eq!(&ret, "10000 1000 10 true");
324/// ```
325#[macro_export]
326macro_rules! call {
327	/* 以下四个匹配用以匹配函数名有空格的情况,匹配模型如下所示,若有需要可打开注释
328	let _ = call!(tcp, "127.0.0.1:9000", "test notargs and notret").await?;
329	let _ = call!(tcp, "127.0.0.1:9000", "test notret", ("hello client",)).await?;
330	let ret = call!(tcp, "127.0.0.1:9000", "test notargs" -> String).await?;
331	let ret = call!(tcp, "127.0.0.1:9000", "test has args and ret", (10000, 1000, 10, true) -> String).await?;
332	//无参数且无返回值
333	(@call $func:ident, $addr:tt, $name:tt) => {
334		async move {
335			match $crate::$func($addr).await {
336				Ok(client) => client.call_without_arg_ret($name).await,
337				Err(e) => Err($crate::msg::Error::from(e)),
338			}
339		}
340	};
341	//无参数有返回值
342	(@call $func:ident, $addr:tt, $name:tt -> $ret:ty) => {
343		async move {
344			match $crate::$func($addr).await {
345				Ok(client) => {
346					let result: $ret = client.call_with_ret($name).await?;
347					Ok(result)
348				}
349				Err(e) => Err($crate::msg::Error::from(e)),
350			}
351		}
352	};
353	//有参数无返回值
354	(@call $func:ident, $addr:tt, $name:tt, $args:tt) => {
355		async move {
356			match $crate::$func($addr).await {
357				Ok(client) => client.call_with_arg($name, $args).await,
358				Err(e) => Err($crate::msg::Error::from(e)),
359			}
360		}
361	};
362	//有参数有返回值
363	(@call $func:ident, $addr:tt, $name:tt, $args:tt -> $ret:ty) => {
364		async move {
365			match $crate::$func($addr).await {
366				Ok(client) => {
367					let result: $ret = client.call_with_arg_ret($name, $args).await?;
368					Ok(result)
369				}
370				Err(e) => Err($crate::msg::Error::from(e)),
371			}
372		}
373	};
374 */
375	//心跳检测
376	(@call $connect:ident, $addr:expr) => {
377		async move {
378			match $crate::$connect($addr).await {
379				Ok(client) => client.heartbeat().await,
380				Err(e) => Err($crate::msg::Error::from(e)),
381			}
382		}
383	};
384	//无参数且无返回值
385	(@call $connect:ident, $addr:expr, $func:ident()) => {
386		async move {
387			match $crate::$connect($addr).await {
388				Ok(client) => client.call_without_arg_ret(stringify!($func)).await,
389				Err(e) => Err($crate::msg::Error::from(e)),
390			}
391		}
392	};
393	//无参数有返回值
394	(@call $connect:ident, $addr:expr, $func:ident() -> $ret:ty) => {
395		async move {
396			match $crate::$connect($addr).await {
397				Ok(client) => {
398					let result: $ret = client.call_with_ret(stringify!($func)).await?;
399					Ok(result)
400				}
401				Err(e) => Err($crate::msg::Error::from(e)),
402			}
403		}
404	};
405	//有参数无返回值
406	(@call $connect:ident, $addr:expr, $func:ident($($arg:expr),+)) => {
407		async move {
408			match $crate::$connect($addr).await {
409				Ok(client) => client.call_with_arg(stringify!($func), ($($arg,)+)).await,
410				Err(e) => Err($crate::msg::Error::from(e)),
411			}
412		}
413	};
414	//有参数有返回值
415	(@call $connect:ident, $addr:expr, $func:ident($($arg:expr),+) -> $ret:ty) => {
416		async move {
417			match $crate::$connect($addr).await {
418				Ok(client) => {
419					let result: $ret = client.call_with_arg_ret(stringify!($func), ($($arg,)+)).await?;
420					Ok(result)
421				}
422				Err(e) => Err($crate::msg::Error::from(e)),
423			}
424		}
425	};
426	(tcp, $($var:tt)+) => {
427		async move {
428			call!(@call new_tcp_client, $($var)+).await
429		}
430	};
431	(unix, $($var:tt)+) => {
432		async move {
433			call!(@call new_unix_client, $($var)+).await
434		}
435	};
436}
437
438/// # 订阅主题
439/// - 第一个参数表示使用的传输协议,可选值**tcp**:使用tcp通信, **unix**:使用unix域套接字通信)
440/// - 第二个参数为通信地址,类型为<font color=blue>&str</font>,例如:`"127.0.0.1:9000"`,`"/tmp/local/unix"`
441/// - 第三个参数为订阅主题,类型为<font color=blue>&str</font>,例如:`"onMessage"`
442/// - 第四个参数为订阅数据回调处理,支持两种格式:
443/// 	- `|id:i32|{})`,lambda回调处理,lambda函数可以有多个参数但每个参数需实现**serde::de::Deserialize**,另外lambda函数返回值类型为<font color=blue>()</font>,且lambda函数体**需用`{}`包裹**
444/// 	- `var`,var类型为**&mut**,且需要实现**SubcribeCallback<T>**,T实现**serde::de::Deserialize**,T即为订阅的数据类型
445/// - 返回值类型为**Result<(),Error>**,<font color=red>此订阅会一直运行只有出错时才会返回</font>
446/// # example
447/// ```
448/// //通过lambda进行回调处理
449/// let _ = subcribe!(unix, "/tmp/local/unix", "sub", |s: String, v: i32| {
450/// 	println!("({}, {})", s, v);
451/// }).await;
452///
453/// //实现SubcribeCallback trait进行回调处理
454/// struct MySub;
455/// impl SubcribeCallback<(String, i32)> for MySub {
456/// 	fn callback(&mut self, data: (String, i32)) -> bool {
457/// 		false
458/// 	}
459/// }
460///
461/// let mut sub = MySub;
462/// let _ = subcribe!(unix, "/tmp/local/unix", "sub", sub).await;
463/// ```
464#[macro_export]
465macro_rules! subcribe {
466	//订阅主题并通过lambda函数回调处理
467	(@sub $connect:ident, $addr:expr, $topic:expr, |$($arg:ident:$argType:ty),+|$body:block) => {
468		async move {
469			match $crate::$connect($addr).await {
470				Ok(client) => client.subcribe_with_lambda($topic, |($($arg,)+):($($argType,)+)|$body).await,
471				Err(e) => Err($crate::msg::Error::from(e)),
472			}
473		}
474	};
475	//订阅主题并通过实现SubcribeCallback trait回调处理
476	(@sub $connect:ident, $addr:expr, $topic:expr, $var:expr) => {
477		async move {
478			match $crate::$connect($addr).await {
479				Ok(client) => client.subcribe_with_trait($topic, $var).await,
480				Err(e) => Err($crate::msg::Error::from(e)),
481			}
482		}
483	};
484	(tcp, $($var:tt)+) => {
485		async move {
486			subcribe!(@sub new_tcp_client, $($var)+).await
487		}
488	};
489	(unix, $($var:tt)+) => {
490		async move {
491			subcribe!(@sub new_unix_client, $($var)+).await
492		}
493	};
494}
495
496/// 用以定义新类型
497#[macro_export]
498macro_rules! define_new_type {
499	//定义无参数无返回值方法
500	(@method fn $name:ident$(<$generic:tt>)?()) => {
501		pub async fn $name$(<$generic>)?(&self) -> Result<(), $crate::msg::Error> {
502			self.0.call_without_arg_ret(stringify!($name)).await
503		}
504	};
505	//定义有参数无返回值方法
506	(@method fn $name:ident$(<$generic:tt>)?($($arg:ident:$argType:ty,)+)) => {
507		pub async fn $name$(<$generic>)?(&self, $($arg:$argType,)+) -> Result<(), $crate::msg::Error>
508		where
509			$($argType: serde::ser::Serialize,)+
510		{
511			self.0.call_with_arg(stringify!($name), ($($arg,)+)).await
512		}
513	};
514	//定义无参数有返回值方法
515	(@method fn $name:ident$(<$generic:tt>)?()->$ret:ty) => {
516		pub async fn $name$(<$generic>)?(&self) -> Result<$ret, $crate::msg::Error>
517		where
518			$ret: for<'a> serde::de::Deserialize<'a>,
519		{
520			self.0.call_with_ret(stringify!($name)).await
521		}
522	};
523	//定义有参数有返回值方法
524	(@method fn $name:ident$(<$generic:tt>)?($($arg:ident:$argType:ty,)+)->$ret:ty) => {
525		pub async fn $name$(<$generic>)?(&self, $($arg:$argType,)+) -> Result<$ret, $crate::msg::Error>
526		where
527			$($argType: serde::ser::Serialize,)+
528			$ret: for<'a> serde::de::Deserialize<'a>,
529		{
530			self.0.call_with_arg_ret(stringify!($name), ($($arg,)+)).await
531		}
532	};
533	//定义sub onOpen(topic:&str,f:impl FnMut(ArgType))格式的订阅方法
534	(@method sub $name:ident($topic:ident:$topicType:ty, $f:ident:$(impl)? FnMut($ArgType:ty))) => {
535		pub async fn $name<F>(&self, $topic:$topicType, $f:F) -> Result<(), $crate::msg::Error>
536		where
537			F: FnMut($ArgType),
538			$topicType: Deref<Target = str>,
539			$ArgType: for<'a> serde::de::Deserialize<'a>,
540		{
541			self.0.subcribe_with_lambda(&$topic, $f).await
542		}
543	};
544	//定义sub onClose(topic:&str,var:&mut V)格式的订阅方法
545	(@method sub $name:ident($topic:ident:$topicType:ty, $var:ident:&mut $ArgType:ty)) => {
546		pub async fn $name<Ret>(&self, $topic:&str, $var:&mut $ArgType) -> Result<(), $crate::msg::Error>
547		where
548			Ret: for<'a> serde::de::Deserialize<'a>,
549			$topicType: Deref<Target = str>,
550			$ArgType: SubcribeCallback<Ret>,
551		{
552			self.0.subcribe_with_trait(&$topic, $var).await
553		}
554	};
555	//定义只有fn方法声明的新类型
556	($f:ident, $t:ident, $StructName:ident, $(fn $name:ident$(<$generic:tt>)?($($arg:ident:$argType:ty),*)$(->$ret:ty)?),+) => {
557		struct $StructName($t);
558		impl $StructName {
559			pub async fn new(path:&str) -> std::io::Result<Self> {
560				Ok(Self($f(path).await?))
561			}
562			pub async fn heartbeat(&self) -> Result<(), $crate::msg::Error> {
563				self.0.heartbeat().await
564			}
565			$(define_new_type!(@method fn $name$(<$generic>)?($($arg:$argType,)*)$(->$ret)?);)+
566		}
567	};
568	//定义只有sub方法声明的新类型
569	($f:ident, $t:ident, $StructName:ident, $(sub $name:ident($topic:ident:$topicType:ty, $arg:ident:$($argType:tt)+)),+) => {
570		struct $StructName($t);
571		impl $StructName {
572			pub async fn new(path:&str) -> std::io::Result<Self> {
573				Ok(Self($f(path).await?))
574			}
575			pub async fn heartbeat(&self) -> Result<(), $crate::msg::Error> {
576				self.0.heartbeat().await
577			}
578			$(define_new_type!(@method sub $name($topic:$topicType,$arg:$($argType)+));)+
579		}
580	};
581	//定义同时包含fn和sub方法声明的新类型
582	($f:ident, $t:ident, $StructName:ident, $(fn $name:ident($($arg:ident:$argType:ty),*)$(->$ret:ty)?),+, $(sub $name2:ident($topic:ident:$topicType:ty, $arg2:ident:$($argType2:tt)+)),+) => {
583		struct $StructName($t);
584		impl $StructName {
585			pub async fn new(path:&str) -> std::io::Result<Self> {
586				Ok(Self($f(path).await?))
587			}
588			pub async fn heartbeat(&self) -> Result<(), $crate::msg::Error> {
589				self.0.heartbeat().await
590			}
591			$(define_new_type!(@method fn $name($($arg:$argType,)*)$(->$ret)?);)+
592			$(define_new_type!(@method sub $name2($topic:$topicType,$arg2:$($argType2)+));)+
593		}
594	};
595}
596
597/// # 定义RPC Client的新类型
598/// - 第一个参数表示使用的传输协议,可选值**tcp**:使用tcp通信, **unix**:使用unix域套接字通信)
599/// - 第二个参数为新类型的名称,例如`MyStruct`
600/// - 剩下的参数为新类型的方法声明(可以声明多个),支持格式如下:
601/// 	- `fn rpc1()`,无参数无返回值声明
602/// 	- `fn rpc2(id:i32)`,有参数无返回值声明
603/// 	- `fn rpc3()->i32`,无参数有返回值声明
604/// 	- `fn rpc4(a:i32,b:bool)->String`,既有参数也有返回值声明
605/// 	- `sub onOpen(topic:TopicType,f:impl FnMut(ArgType))`,订阅主题通过lambda函数回调处理,其中TopicType必须实现**Deref<Target = str>**,ArgType需实现**serde::de::Deserialize**,另外lambda函数返回值类型为**()**
606/// 	- `sub onClose(topic:TopicType,var:&mut V)`,订阅主题通过SubcribeCallback trait处理,其中TopicType必须实现**Deref<Target = str>**,var类型必须为**&mut**,V需要实现**SubcribeCallback<T>**,T实现**serde::de::Deserialize**,T即为订阅的数据类型
607/// - 注意:
608/// 	- 此宏会默认一个pub async fn new(path:&str) -> std::io::Result<Self>的方法,用以创建新定义的类型
609/// 	- 此宏会默认一个pub async fn heartbeat(&self) -> Result<(), Error>的方法,用以检测服务端是否存在
610/// 	- 不需要在方法参数表第一个参数指明self,此宏会自己生成
611/// 	- 当同时存在fn与sub方法声明时,以**fn**开头的所有方法声明必须在以**sub**的前面,即先**fn声明在前,sub声明在后**
612/// # example
613/// ```
614/// //实现SubcribeCallback trait进行回调处理
615/// struct MySub;
616/// impl SubcribeCallback<(String, i32)> for MySub {
617/// 	fn callback(&mut self, data: (String, i32)) -> bool {
618/// 		false
619/// 	}
620/// }
621/// 只定义rpc方法
622/// define!(unix, Test1,
623/// 	fn rpc1(),
624/// 	fn rpc2(id:i32),
625/// 	fn rpc3()->i32,
626/// 	fn rpc4(a:i32,b:bool)->String
627/// );
628/// 只定义订阅方法
629/// define!(unix, Test2,
630/// 	sub onOpen(topic:&'static str, f:impl FnMut((String,i32))),
631/// 	sub onClose(topic:&'static str, var:&mut MySub)
632/// );
633/// 定义rpc方法和订阅方法
634/// define!(unix, Test,
635/// 	fn rpc1(),
636/// 	fn rpc2(id:i32),
637/// 	fn rpc3()->i32,
638/// 	fn rpc4(a:i32,b:bool)->String,
639/// 	sub onOpen(topic:&'static str, f:impl FnMut((String,i32))),
640/// 	sub onClose(topic:&'static str, var:&mut MySub)
641/// );
642/// //如上定义会生成一个新类型Test,包含方法rpc1、rpc2、rpc3、rpc4、onOpen、onClose生成的新类型代码大致如下所示:
643/// struct Test(T);
644/// impl Test {
645/// 	pub async fn new(path:&str) -> std::io::Result<Self> {...} //默认生成的方法
646/// 	pub async fn heartbeat(&self) -> Result<(), Error> {...} //默认生成的方法,用以检测服务端是否存在
647/// 	pub async fn rpc1(&self) -> Result<(), Error> {...}
648/// 	pub async fn rpc2(&self, id:i32) -> Result<(), Error> {...}
649/// 	pub async fn rpc3(&self) -> Result<i32, Error> {...}
650/// 	pub async fn rpc4(&self, a:i32, b:bool) -> Result<String, Error> {...}
651/// 	pub async fn onOpen(topic:&'static str, f:impl FnMut(String,i32)) -> Result<(), Error> {...} //此方法会一直执行直到出错才返回
652/// 	pub async fn onClose(topic:&'static str, var:&mut MySub) -> Result<(), Error> {...} //此方法若var.callback()的返回值是true就一直执行直到出错才返回,若返回值是false则立即返回Ok(())
653/// }
654/// ```
655#[macro_export]
656macro_rules! define {
657	(tcp, $($var:tt)+) => {
658		define_new_type!(new_tcp_client, TCPClient, $($var)+);
659	};
660	(unix, $($var:tt)+) => {
661		define_new_type!(new_unix_client, UnixClient, $($var)+);
662	};
663}