krpc/
server.rs

1extern crate rmp_serde as rmps;
2use std::collections::HashMap;
3use std::future::Future;
4
5use super::msg::*;
6#[cfg(unix)]
7use std::path::Path;
8use std::pin::Pin;
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10use tokio::net::TcpListener;
11#[cfg(unix)]
12use tokio::net::UnixListener;
13use tokio::sync::mpsc;
14
15include!("publish.rs");
16include!("session.rs");
17
18pub type AsyncCallback = Pin<Box<dyn Future<Output = Bytes> + Send>>;
19pub type Callback = Box<dyn Fn(Msg) -> AsyncCallback + Send + Sync>;
20
21/// 用以声明回调函数,主要用于服务端绑定lambda函数(支持同步和异步)
22///
23/// 支持如下格式:
24/// - 前缀`krpc::clone!(a,...)` `async` `move`都是可选的,如果lambda函数中使用的变量需要在函数体外被使用,需用`clone!`标注这些变量,例如`clone!(tx,var)`
25/// - ||{...},无参数也无返回值
26/// - |id:i32|{...},有参数无返回值
27/// - ||->String {...},无参数有返回值
28/// - |id:i32,s:String|->String {...},有参数且有返回值
29#[macro_export]
30macro_rules! callback {
31	($($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? || $b:block) => {
32        //无参数也无返回值
33        {
34            $($(let $var = $var.clone();)*)?
35            Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
36                $($(let $var = $var.clone();)*)?
37                Box::pin(async move {
38                    $b;
39                    msg.encode_without_body($crate::msg::Mode::Respond)
40                })
41            })
42        }
43    };
44    ($($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? |$($a:ident :$t:ty),+| $b:block) => {
45        //有参数无返回值
46        {
47            $($(let $var = $var.clone();)*)?
48            Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
49                $($(let $var = $var.clone();)*)?
50                Box::pin(async move {
51                    let body = msg.parse::<($($t,)+)>();
52                    match body {
53                        Ok(($($a,)+)) => {
54                            $b;
55                            msg.encode_without_body($crate::msg::Mode::Respond)
56                        }
57                        Err(_) => msg.encode_without_body($crate::msg::Mode::NotMatch),
58                    }
59                })
60            })
61        }
62    };
63    ($($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? || -> $r:ty $b:block) => {
64        //无参数有返回值
65        {
66            $($(let $var = $var.clone();)*)?
67            Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
68                $($(let $var = $var.clone();)*)?
69                Box::pin(async move {
70                    let ret:$r = $b;
71                    msg.encode($crate::msg::Mode::Respond, &ret)
72                })
73            })
74        }
75    };
76    ($($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? |$($a:ident :$t:ty),+| -> $r:ty $b:block) => {
77        //有参数且有返回值
78        {
79            $($(let $var = $var.clone();)*)?
80            Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
81                $($(let $var = $var.clone();)*)?
82                Box::pin(async move {
83                    let body = msg.parse::<($($t,)+)>();
84                    match body {
85                        Ok(($($a,)+)) => {
86                            let ret:$r = $b;
87                            msg.encode($crate::msg::Mode::Respond, &ret)
88                        }
89                        Err(_) => msg.encode_without_body($crate::msg::Mode::NotMatch),
90                    }
91                })
92            })
93        }
94    };
95    (@with_return $($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? || $b:block) => {
96        //无参数也无返回值且会提前return
97        {
98            $($(let $var = $var.clone();)*)?
99            Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
100                $($(let $var = $var.clone();)*)?
101                Box::pin(async move {
102                    let fu = async move {$b};
103					fu.await;
104                    msg.encode_without_body($crate::msg::Mode::Respond)
105                })
106            })
107        }
108    };
109    (@with_return $($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? |$($a:ident :$t:ty),+| $b:block) => {
110        //有参数无返回值且会提前return
111        {
112            $($(let $var = $var.clone();)*)?
113            Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
114                $($(let $var = $var.clone();)*)?
115                Box::pin(async move {
116                    let body = msg.parse::<($($t,)+)>();
117                    match body {
118                        Ok(($($a,)+)) => {
119                            let fu = async move {$b};
120							fu.await;
121							//$b;
122                            msg.encode_without_body($crate::msg::Mode::Respond)
123                        }
124                        Err(_) => msg.encode_without_body($crate::msg::Mode::NotMatch),
125                    }
126                })
127            })
128        }
129    };
130    (@with_return $($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? || -> $r:ty $b:block) => {
131        //无参数有返回值且会提前return
132        {
133            $($(let $var = $var.clone();)*)?
134            Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
135                $($(let $var = $var.clone();)*)?
136                Box::pin(async move {
137					let fu = async move {$b};
138                    let ret:$r = fu.await;
139                    msg.encode($crate::msg::Mode::Respond, &ret)
140                })
141            })
142        }
143    };
144    (@with_return $($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? |$($a:ident :$t:ty),+| -> $r:ty $b:block) => {
145        //有参数且有返回值且会提前return
146        {
147            $($(let $var = $var.clone();)*)?
148            Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
149                $($(let $var = $var.clone();)*)?
150                Box::pin(async move {
151                    let body = msg.parse::<($($t,)+)>();
152                    match body {
153                        Ok(($($a,)+)) => {
154							let fu = async move {$b};
155		                    let ret:$r = fu.await;
156                            msg.encode($crate::msg::Mode::Respond, &ret)
157                        }
158                        Err(_) => msg.encode_without_body($crate::msg::Mode::NotMatch),
159                    }
160                })
161            })
162        }
163    };
164}
165
166/// RPC服务端
167pub struct Server {
168	funcs: HashMap<&'static [u8], Callback>,
169	sub_tx: mpsc::Sender<Content>,
170	sub_rx: mpsc::Receiver<Content>,
171	call_tx: Option<mpsc::Sender<Pair>>,
172	call_rx: Option<mpsc::Receiver<Pair>>,
173}
174
175/// 用以监听Server
176macro_rules! listen {
177	($self:ident, $listener:ident, $onSubcribe:ident) => {
178		if let Some(mut rx) = $self.call_rx.take() {
179			//用于RPC的调用请求并回复
180			let funcs = $self.funcs;
181			tokio::spawn(async move {
182				loop {
183					let Some(Pair { msg, tx }) = rx.recv().await else {break;};
184					if let Some(f) = funcs.get(msg.name()) {
185						let _ = tx.send(f(msg).await).await;
186					} else {
187						let _ = tx.send(msg.encode_without_body(Mode::NotFound)).await;
188					}
189				}
190			});
191		}
192		//用于接收RPC的订阅请求以及发布信息请求
193		let mut rx = $self.sub_rx;
194		tokio::spawn(async move {
195			let mut ps = PubSub { subs: HashMap::new() };
196			loop {
197				let Some(c) = rx.recv().await else {break;};
198				match c {
199					Content::Sub(id, pair) => ps.add(id, pair, &mut $onSubcribe).await,
200					Content::Pub(name, data) => ps.publish(name, data).await,
201					Content::Del(id) => ps.del(id),
202				}
203			}
204		});
205		//用于接收RPC客户端连接
206		let mut id = 0u32;
207		loop {
208			let (stream, _) = $listener.accept().await?;
209			let (readstream, writestream) = stream.into_split();
210			let (tx, rx) = mpsc::channel(3); //创建用于发送的通道
211			id += 1;
212			{
213				let sub_tx = $self.sub_tx.clone();
214				let call_tx = $self.call_tx.clone();
215				tokio::spawn(async move {
216					let _sub_tx = sub_tx.clone();
217					let _ = recv(readstream, sub_tx, call_tx, tx, id).await;
218					let _ = _sub_tx.send(Content::Del(id)).await;
219				});
220			}
221			tokio::spawn(send(writestream, rx));
222		}
223	};
224}
225
226impl Server {
227	/// 创建一个RPC服务
228	#[inline]
229	pub fn new() -> Self {
230		let (stx, srx) = mpsc::channel(3);
231		Server { funcs: HashMap::new(), sub_tx: stx, sub_rx: srx, call_tx: None, call_rx: None }
232	}
233	/// 绑定RPC回调函数,请使用callback!宏必要时结合clone!宏进行bind操作
234	#[inline]
235	pub fn bind(&mut self, name: &'static str, f: Callback) {
236		if self.call_rx.is_none() {
237			let (ctx, crx) = mpsc::channel(3);
238			self.call_tx = Some(ctx);
239			self.call_rx = Some(crx);
240		}
241		self.funcs.insert(name.as_bytes(), f);
242	}
243	/// 获取发布器用于发布消息
244	#[inline]
245	pub fn publisher(&self) -> Publisher {
246		Publisher { tx: self.sub_tx.clone() }
247	}
248	/// 通过unix域套接字异步运行服务端监听指定的路径
249	#[cfg(unix)]
250	pub async fn run_by_unix(
251		mut self, path: &str, mut on_subcribe: impl FnMut(&str) -> Option<Bytes> + Send + Sync + 'static,
252	) -> std::io::Result<()> {
253		let path = Path::new(path);
254		if path.exists() {
255			std::fs::remove_file(path)?;
256		} else {
257			match path.parent() {
258				Some(dir) if !dir.exists() => std::fs::create_dir_all(dir)?,
259				_ => (),
260			}
261		}
262		let listener = UnixListener::bind(path)?;
263		listen!(self, listener, on_subcribe);
264	}
265	/// 通过tcp套接字异步运行服务端监听指定的地址
266	pub async fn run_by_tcp(
267		mut self, addr: &str, mut on_subcribe: impl FnMut(&str) -> Option<Bytes> + Send + Sync + 'static,
268	) -> std::io::Result<()> {
269		let listener = TcpListener::bind(addr).await?;
270		listen!(self, listener, on_subcribe);
271	}
272	/// 异步运行服务端监听指定的地址,如果是类unix系统则使用unix域套接字通信,如果是其他系统则使用tcp通信
273	#[cfg(unix)]
274	pub async fn run(self, addr: &str) -> std::io::Result<()> {
275		self.run_by_unix(addr, |_| None).await
276	}
277	#[cfg(unix)]
278	pub async fn run_with(
279		self, addr: &str, on_subcribe: impl FnMut(&str) -> Option<Bytes> + Send + Sync + 'static,
280	) -> std::io::Result<()> {
281		self.run_by_unix(addr, on_subcribe).await
282	}
283	#[cfg(not(unix))]
284	pub async fn run(self, addr: &str) -> std::io::Result<()> {
285		self.run_by_tcp(addr, |_| None).await
286	}
287	#[cfg(not(unix))]
288	pub async fn run_with(
289		self, addr: &str, on_subcribe: impl FnMut(&str) -> Option<Bytes> + Send + Sync + 'static,
290	) -> std::io::Result<()> {
291		self.run_by_tcp(addr, on_subcribe).await
292	}
293}