1extern crate rmp_serde as rmps;
2use std::collections::HashMap;
3use std::future::Future;
4
5use super::msg::*;
6#[cfg(unix)]
7use std::path::Path;
8use std::pin::Pin;
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10use tokio::net::TcpListener;
11#[cfg(unix)]
12use tokio::net::UnixListener;
13use tokio::sync::mpsc;
14
15include!("publish.rs");
16include!("session.rs");
17
18pub type AsyncCallback = Pin<Box<dyn Future<Output = Bytes> + Send>>;
19pub type Callback = Box<dyn Fn(Msg) -> AsyncCallback + Send + Sync>;
20
21#[macro_export]
30macro_rules! callback {
31 ($($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? || $b:block) => {
32 {
34 $($(let $var = $var.clone();)*)?
35 Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
36 $($(let $var = $var.clone();)*)?
37 Box::pin(async move {
38 $b;
39 msg.encode_without_body($crate::msg::Mode::Respond)
40 })
41 })
42 }
43 };
44 ($($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? |$($a:ident :$t:ty),+| $b:block) => {
45 {
47 $($(let $var = $var.clone();)*)?
48 Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
49 $($(let $var = $var.clone();)*)?
50 Box::pin(async move {
51 let body = msg.parse::<($($t,)+)>();
52 match body {
53 Ok(($($a,)+)) => {
54 $b;
55 msg.encode_without_body($crate::msg::Mode::Respond)
56 }
57 Err(_) => msg.encode_without_body($crate::msg::Mode::NotMatch),
58 }
59 })
60 })
61 }
62 };
63 ($($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? || -> $r:ty $b:block) => {
64 {
66 $($(let $var = $var.clone();)*)?
67 Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
68 $($(let $var = $var.clone();)*)?
69 Box::pin(async move {
70 let ret:$r = $b;
71 msg.encode($crate::msg::Mode::Respond, &ret)
72 })
73 })
74 }
75 };
76 ($($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? |$($a:ident :$t:ty),+| -> $r:ty $b:block) => {
77 {
79 $($(let $var = $var.clone();)*)?
80 Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
81 $($(let $var = $var.clone();)*)?
82 Box::pin(async move {
83 let body = msg.parse::<($($t,)+)>();
84 match body {
85 Ok(($($a,)+)) => {
86 let ret:$r = $b;
87 msg.encode($crate::msg::Mode::Respond, &ret)
88 }
89 Err(_) => msg.encode_without_body($crate::msg::Mode::NotMatch),
90 }
91 })
92 })
93 }
94 };
95 (@with_return $($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? || $b:block) => {
96 {
98 $($(let $var = $var.clone();)*)?
99 Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
100 $($(let $var = $var.clone();)*)?
101 Box::pin(async move {
102 let fu = async move {$b};
103 fu.await;
104 msg.encode_without_body($crate::msg::Mode::Respond)
105 })
106 })
107 }
108 };
109 (@with_return $($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? |$($a:ident :$t:ty),+| $b:block) => {
110 {
112 $($(let $var = $var.clone();)*)?
113 Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
114 $($(let $var = $var.clone();)*)?
115 Box::pin(async move {
116 let body = msg.parse::<($($t,)+)>();
117 match body {
118 Ok(($($a,)+)) => {
119 let fu = async move {$b};
120 fu.await;
121 msg.encode_without_body($crate::msg::Mode::Respond)
123 }
124 Err(_) => msg.encode_without_body($crate::msg::Mode::NotMatch),
125 }
126 })
127 })
128 }
129 };
130 (@with_return $($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? || -> $r:ty $b:block) => {
131 {
133 $($(let $var = $var.clone();)*)?
134 Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
135 $($(let $var = $var.clone();)*)?
136 Box::pin(async move {
137 let fu = async move {$b};
138 let ret:$r = fu.await;
139 msg.encode($crate::msg::Mode::Respond, &ret)
140 })
141 })
142 }
143 };
144 (@with_return $($(krpc::)?clone!($($var:ident),*),)? $(async)? $(move)? |$($a:ident :$t:ty),+| -> $r:ty $b:block) => {
145 {
147 $($(let $var = $var.clone();)*)?
148 Box::new(move |msg: $crate::msg::Msg| -> $crate::AsyncCallback {
149 $($(let $var = $var.clone();)*)?
150 Box::pin(async move {
151 let body = msg.parse::<($($t,)+)>();
152 match body {
153 Ok(($($a,)+)) => {
154 let fu = async move {$b};
155 let ret:$r = fu.await;
156 msg.encode($crate::msg::Mode::Respond, &ret)
157 }
158 Err(_) => msg.encode_without_body($crate::msg::Mode::NotMatch),
159 }
160 })
161 })
162 }
163 };
164}
165
166pub struct Server {
168 funcs: HashMap<&'static [u8], Callback>,
169 sub_tx: mpsc::Sender<Content>,
170 sub_rx: mpsc::Receiver<Content>,
171 call_tx: Option<mpsc::Sender<Pair>>,
172 call_rx: Option<mpsc::Receiver<Pair>>,
173}
174
175macro_rules! listen {
177 ($self:ident, $listener:ident, $onSubcribe:ident) => {
178 if let Some(mut rx) = $self.call_rx.take() {
179 let funcs = $self.funcs;
181 tokio::spawn(async move {
182 loop {
183 let Some(Pair { msg, tx }) = rx.recv().await else {break;};
184 if let Some(f) = funcs.get(msg.name()) {
185 let _ = tx.send(f(msg).await).await;
186 } else {
187 let _ = tx.send(msg.encode_without_body(Mode::NotFound)).await;
188 }
189 }
190 });
191 }
192 let mut rx = $self.sub_rx;
194 tokio::spawn(async move {
195 let mut ps = PubSub { subs: HashMap::new() };
196 loop {
197 let Some(c) = rx.recv().await else {break;};
198 match c {
199 Content::Sub(id, pair) => ps.add(id, pair, &mut $onSubcribe).await,
200 Content::Pub(name, data) => ps.publish(name, data).await,
201 Content::Del(id) => ps.del(id),
202 }
203 }
204 });
205 let mut id = 0u32;
207 loop {
208 let (stream, _) = $listener.accept().await?;
209 let (readstream, writestream) = stream.into_split();
210 let (tx, rx) = mpsc::channel(3); id += 1;
212 {
213 let sub_tx = $self.sub_tx.clone();
214 let call_tx = $self.call_tx.clone();
215 tokio::spawn(async move {
216 let _sub_tx = sub_tx.clone();
217 let _ = recv(readstream, sub_tx, call_tx, tx, id).await;
218 let _ = _sub_tx.send(Content::Del(id)).await;
219 });
220 }
221 tokio::spawn(send(writestream, rx));
222 }
223 };
224}
225
226impl Server {
227 #[inline]
229 pub fn new() -> Self {
230 let (stx, srx) = mpsc::channel(3);
231 Server { funcs: HashMap::new(), sub_tx: stx, sub_rx: srx, call_tx: None, call_rx: None }
232 }
233 #[inline]
235 pub fn bind(&mut self, name: &'static str, f: Callback) {
236 if self.call_rx.is_none() {
237 let (ctx, crx) = mpsc::channel(3);
238 self.call_tx = Some(ctx);
239 self.call_rx = Some(crx);
240 }
241 self.funcs.insert(name.as_bytes(), f);
242 }
243 #[inline]
245 pub fn publisher(&self) -> Publisher {
246 Publisher { tx: self.sub_tx.clone() }
247 }
248 #[cfg(unix)]
250 pub async fn run_by_unix(
251 mut self, path: &str, mut on_subcribe: impl FnMut(&str) -> Option<Bytes> + Send + Sync + 'static,
252 ) -> std::io::Result<()> {
253 let path = Path::new(path);
254 if path.exists() {
255 std::fs::remove_file(path)?;
256 } else {
257 match path.parent() {
258 Some(dir) if !dir.exists() => std::fs::create_dir_all(dir)?,
259 _ => (),
260 }
261 }
262 let listener = UnixListener::bind(path)?;
263 listen!(self, listener, on_subcribe);
264 }
265 pub async fn run_by_tcp(
267 mut self, addr: &str, mut on_subcribe: impl FnMut(&str) -> Option<Bytes> + Send + Sync + 'static,
268 ) -> std::io::Result<()> {
269 let listener = TcpListener::bind(addr).await?;
270 listen!(self, listener, on_subcribe);
271 }
272 #[cfg(unix)]
274 pub async fn run(self, addr: &str) -> std::io::Result<()> {
275 self.run_by_unix(addr, |_| None).await
276 }
277 #[cfg(unix)]
278 pub async fn run_with(
279 self, addr: &str, on_subcribe: impl FnMut(&str) -> Option<Bytes> + Send + Sync + 'static,
280 ) -> std::io::Result<()> {
281 self.run_by_unix(addr, on_subcribe).await
282 }
283 #[cfg(not(unix))]
284 pub async fn run(self, addr: &str) -> std::io::Result<()> {
285 self.run_by_tcp(addr, |_| None).await
286 }
287 #[cfg(not(unix))]
288 pub async fn run_with(
289 self, addr: &str, on_subcribe: impl FnMut(&str) -> Option<Bytes> + Send + Sync + 'static,
290 ) -> std::io::Result<()> {
291 self.run_by_tcp(addr, on_subcribe).await
292 }
293}