1mod classified_node;
2mod node;
3
4use crate::client::{CmdSend, SendGuard};
5use crate::cmd::CmdBodyRead;
6use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
7use crate::{
8 CmdBody, CmdHandler, CmdHeader, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId,
9};
10use async_named_locker::ObjectHolder;
11use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
12pub use classified_node::*;
13pub use node::*;
14use num::{FromPrimitive, ToPrimitive};
15use sfo_pool::WorkerClassification;
16use sfo_split::{RHalf, WHalf};
17use std::fmt::Debug;
18use std::hash::Hash;
19use std::ops::DerefMut;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
23use tokio::task::JoinHandle;
24
25#[async_trait::async_trait]
26pub trait CmdNode<
27 LEN: RawEncode
28 + for<'a> RawDecode<'a>
29 + Copy
30 + RawFixedBytes
31 + Sync
32 + Send
33 + 'static
34 + FromPrimitive
35 + ToPrimitive,
36 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
37 M: CmdTunnelMeta,
38 S: CmdSend<M>,
39 G: SendGuard<M, S>,
40>
41{
42 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
43 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
44 async fn send_with_resp(
45 &self,
46 peer_id: &PeerId,
47 cmd: CMD,
48 version: u8,
49 body: &[u8],
50 timeout: Duration,
51 ) -> CmdResult<CmdBody>;
52 async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]])
53 -> CmdResult<()>;
54 async fn send2_with_resp(
55 &self,
56 peer_id: &PeerId,
57 cmd: CMD,
58 version: u8,
59 body: &[&[u8]],
60 timeout: Duration,
61 ) -> CmdResult<CmdBody>;
62 async fn send_cmd(
63 &self,
64 peer_id: &PeerId,
65 cmd: CMD,
66 version: u8,
67 body: CmdBody,
68 ) -> CmdResult<()>;
69 async fn send_cmd_with_resp(
70 &self,
71 peer_id: &PeerId,
72 cmd: CMD,
73 version: u8,
74 body: CmdBody,
75 timeout: Duration,
76 ) -> CmdResult<CmdBody>;
77 async fn send_by_specify_tunnel(
78 &self,
79 peer_id: &PeerId,
80 tunnel_id: TunnelId,
81 cmd: CMD,
82 version: u8,
83 body: &[u8],
84 ) -> CmdResult<()>;
85 async fn send_by_specify_tunnel_with_resp(
86 &self,
87 peer_id: &PeerId,
88 tunnel_id: TunnelId,
89 cmd: CMD,
90 version: u8,
91 body: &[u8],
92 timeout: Duration,
93 ) -> CmdResult<CmdBody>;
94 async fn send2_by_specify_tunnel(
95 &self,
96 peer_id: &PeerId,
97 tunnel_id: TunnelId,
98 cmd: CMD,
99 version: u8,
100 body: &[&[u8]],
101 ) -> CmdResult<()>;
102 async fn send2_by_specify_tunnel_with_resp(
103 &self,
104 peer_id: &PeerId,
105 tunnel_id: TunnelId,
106 cmd: CMD,
107 version: u8,
108 body: &[&[u8]],
109 timeout: Duration,
110 ) -> CmdResult<CmdBody>;
111 async fn send_cmd_by_specify_tunnel(
112 &self,
113 peer_id: &PeerId,
114 tunnel_id: TunnelId,
115 cmd: CMD,
116 version: u8,
117 body: CmdBody,
118 ) -> CmdResult<()>;
119 async fn send_cmd_by_specify_tunnel_with_resp(
120 &self,
121 peer_id: &PeerId,
122 tunnel_id: TunnelId,
123 cmd: CMD,
124 version: u8,
125 body: CmdBody,
126 timeout: Duration,
127 ) -> CmdResult<CmdBody>;
128 async fn clear_all_tunnel(&self);
129 async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<G>;
130}
131
132#[async_trait::async_trait]
133pub trait ClassifiedCmdNode<
134 LEN: RawEncode
135 + for<'a> RawDecode<'a>
136 + Copy
137 + RawFixedBytes
138 + Sync
139 + Send
140 + 'static
141 + FromPrimitive
142 + ToPrimitive,
143 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
144 C: WorkerClassification,
145 M: CmdTunnelMeta,
146 S: CmdSend<M>,
147 G: SendGuard<M, S>,
148>: CmdNode<LEN, CMD, M, S, G>
149{
150 async fn send_by_classified_tunnel(
151 &self,
152 classification: C,
153 cmd: CMD,
154 version: u8,
155 body: &[u8],
156 ) -> CmdResult<()>;
157 async fn send_by_classified_tunnel_with_resp(
158 &self,
159 classification: C,
160 cmd: CMD,
161 version: u8,
162 body: &[u8],
163 timeout: Duration,
164 ) -> CmdResult<CmdBody>;
165 async fn send2_by_classified_tunnel(
166 &self,
167 classification: C,
168 cmd: CMD,
169 version: u8,
170 body: &[&[u8]],
171 ) -> CmdResult<()>;
172 async fn send2_by_classified_tunnel_with_resp(
173 &self,
174 classification: C,
175 cmd: CMD,
176 version: u8,
177 body: &[&[u8]],
178 timeout: Duration,
179 ) -> CmdResult<CmdBody>;
180 async fn send_cmd_by_classified_tunnel(
181 &self,
182 classification: C,
183 cmd: CMD,
184 version: u8,
185 body: CmdBody,
186 ) -> CmdResult<()>;
187 async fn send_cmd_by_classified_tunnel_with_resp(
188 &self,
189 classification: C,
190 cmd: CMD,
191 version: u8,
192 body: CmdBody,
193 timeout: Duration,
194 ) -> CmdResult<CmdBody>;
195 async fn send_by_peer_classified_tunnel(
196 &self,
197 peer_id: &PeerId,
198 classification: C,
199 cmd: CMD,
200 version: u8,
201 body: &[u8],
202 ) -> CmdResult<()>;
203 async fn send_by_peer_classified_tunnel_with_resp(
204 &self,
205 peer_id: &PeerId,
206 classification: C,
207 cmd: CMD,
208 version: u8,
209 body: &[u8],
210 timeout: Duration,
211 ) -> CmdResult<CmdBody>;
212 async fn send2_by_peer_classified_tunnel(
213 &self,
214 peer_id: &PeerId,
215 classification: C,
216 cmd: CMD,
217 version: u8,
218 body: &[&[u8]],
219 ) -> CmdResult<()>;
220 async fn send2_by_peer_classified_tunnel_with_resp(
221 &self,
222 peer_id: &PeerId,
223 classification: C,
224 cmd: CMD,
225 version: u8,
226 body: &[&[u8]],
227 timeout: Duration,
228 ) -> CmdResult<CmdBody>;
229 async fn send_cmd_by_peer_classified_tunnel(
230 &self,
231 peer_id: &PeerId,
232 classification: C,
233 cmd: CMD,
234 version: u8,
235 body: CmdBody,
236 ) -> CmdResult<()>;
237 async fn send_cmd_by_peer_classified_tunnel_with_resp(
238 &self,
239 peer_id: &PeerId,
240 classification: C,
241 cmd: CMD,
242 version: u8,
243 body: CmdBody,
244 timeout: Duration,
245 ) -> CmdResult<CmdBody>;
246 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
247 async fn find_tunnel_id_by_peer_classified(
248 &self,
249 peer_id: &PeerId,
250 classification: C,
251 ) -> CmdResult<TunnelId>;
252 async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
253 async fn get_send_by_peer_classified(
254 &self,
255 peer_id: &PeerId,
256 classification: C,
257 ) -> CmdResult<G>;
258}
259
260pub(crate) fn create_recv_handle<
261 M: CmdTunnelMeta,
262 R: CmdTunnelRead<M>,
263 W: CmdTunnelWrite<M>,
264 LEN: RawEncode
265 + for<'a> RawDecode<'a>
266 + Copy
267 + Send
268 + Sync
269 + 'static
270 + FromPrimitive
271 + ToPrimitive
272 + RawFixedBytes,
273 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
274>(
275 mut reader: RHalf<R, W>,
276 write: ObjectHolder<WHalf<R, W>>,
277 tunnel_id: TunnelId,
278 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
279) -> JoinHandle<CmdResult<()>> {
280 let recv_handle = tokio::spawn(async move {
281 let ret: CmdResult<()> = async move {
282 let remote_id = reader.get_remote_peer_id();
283 loop {
284 log::trace!("tunnel {:?} enter recv proc", tunnel_id);
285 let header_len = reader
286 .read_u8()
287 .await
288 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
289 log::trace!("tunnel {:?} recv cmd len {}", tunnel_id, header_len);
290 let mut header = vec![0u8; header_len as usize];
291 let n = reader
292 .read_exact(&mut header)
293 .await
294 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
295 if n == 0 {
296 break;
297 }
298 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
299 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
300 log::trace!(
301 "tunnel {:?} recv cmd {:?} from {} len {}",
302 tunnel_id,
303 header.cmd_code(),
304 remote_id.to_base36(),
305 header.pkg_len().to_u64().unwrap()
306 );
307 let body_len = header.pkg_len().to_u64().unwrap();
308 let cmd_read =
309 CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
310 let waiter = cmd_read.get_waiter();
311 let future = waiter
312 .create_result_future()
313 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
314 {
315 let version = header.version();
316 let seq = header.seq();
317 let cmd_code = header.cmd_code();
318 let body_read = cmd_read;
319 match cmd_handler
320 .handle(
321 remote_id.clone(),
322 tunnel_id,
323 header,
324 CmdBody::from_reader(BufReader::new(body_read), body_len),
325 )
326 .await
327 {
328 Ok(Some(mut body)) => {
329 let mut write = write.get().await;
330 let header = CmdHeader::<LEN, CMD>::new(
331 version,
332 true,
333 seq,
334 cmd_code,
335 LEN::from_u64(body.len()).unwrap(),
336 );
337 let buf = header
338 .to_vec()
339 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
340 if buf.len() > 255 {
341 return Err(cmd_err!(
342 CmdErrorCode::RawCodecError,
343 "header too long"
344 ));
345 }
346 write
347 .write_u8(buf.len() as u8)
348 .await
349 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
350 write
351 .write_all(buf.as_slice())
352 .await
353 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
354 tokio::io::copy(&mut body, write.deref_mut().deref_mut())
355 .await
356 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
357 write
358 .flush()
359 .await
360 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
361 }
362 Err(e) => {
363 log::error!("handle cmd error: {:?}", e);
364 }
365 _ => {}
366 }
367 };
368 reader = future
369 .await
370 .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
371 }
373 Ok(())
374 }
375 .await;
376 ret
377 });
378 recv_handle
379}