1mod classified_node;
2mod node;
3
4use crate::client::{CmdSend, SendGuard};
5use crate::cmd::CmdBodyRead;
6use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
7use crate::{
8 CmdBody, CmdHandler, CmdHeader, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId,
9};
10use async_named_locker::ObjectHolder;
11use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
12pub use classified_node::*;
13pub use node::*;
14use num::{FromPrimitive, ToPrimitive};
15use sfo_pool::WorkerClassification;
16use sfo_split::{RHalf, WHalf};
17use std::fmt::Debug;
18use std::hash::Hash;
19use std::ops::DerefMut;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
23use tokio::task::JoinHandle;
24
25#[async_trait::async_trait]
26pub trait CmdNode<
27 LEN: RawEncode
28 + for<'a> RawDecode<'a>
29 + Copy
30 + RawFixedBytes
31 + Sync
32 + Send
33 + 'static
34 + FromPrimitive
35 + ToPrimitive,
36 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
37 M: CmdTunnelMeta,
38 S: CmdSend<M>,
39 G: SendGuard<M, S>,
40>
41{
42 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
43 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
44 async fn send_with_resp(
45 &self,
46 peer_id: &PeerId,
47 cmd: CMD,
48 version: u8,
49 body: &[u8],
50 timeout: Duration,
51 ) -> CmdResult<CmdBody>;
52 async fn send_parts(
53 &self,
54 peer_id: &PeerId,
55 cmd: CMD,
56 version: u8,
57 body: &[&[u8]],
58 ) -> CmdResult<()>;
59 async fn send_parts_with_resp(
60 &self,
61 peer_id: &PeerId,
62 cmd: CMD,
63 version: u8,
64 body: &[&[u8]],
65 timeout: Duration,
66 ) -> CmdResult<CmdBody>;
67 #[deprecated(note = "use send_parts instead")]
68 async fn send2(
69 &self,
70 peer_id: &PeerId,
71 cmd: CMD,
72 version: u8,
73 body: &[&[u8]],
74 ) -> CmdResult<()> {
75 self.send_parts(peer_id, cmd, version, body).await
76 }
77 #[deprecated(note = "use send_parts_with_resp instead")]
78 async fn send2_with_resp(
79 &self,
80 peer_id: &PeerId,
81 cmd: CMD,
82 version: u8,
83 body: &[&[u8]],
84 timeout: Duration,
85 ) -> CmdResult<CmdBody> {
86 self.send_parts_with_resp(peer_id, cmd, version, body, timeout)
87 .await
88 }
89 async fn send_cmd(
90 &self,
91 peer_id: &PeerId,
92 cmd: CMD,
93 version: u8,
94 body: CmdBody,
95 ) -> CmdResult<()>;
96 async fn send_cmd_with_resp(
97 &self,
98 peer_id: &PeerId,
99 cmd: CMD,
100 version: u8,
101 body: CmdBody,
102 timeout: Duration,
103 ) -> CmdResult<CmdBody>;
104 async fn send_by_specify_tunnel(
105 &self,
106 peer_id: &PeerId,
107 tunnel_id: TunnelId,
108 cmd: CMD,
109 version: u8,
110 body: &[u8],
111 ) -> CmdResult<()>;
112 async fn send_by_specify_tunnel_with_resp(
113 &self,
114 peer_id: &PeerId,
115 tunnel_id: TunnelId,
116 cmd: CMD,
117 version: u8,
118 body: &[u8],
119 timeout: Duration,
120 ) -> CmdResult<CmdBody>;
121 async fn send_parts_by_specify_tunnel(
122 &self,
123 peer_id: &PeerId,
124 tunnel_id: TunnelId,
125 cmd: CMD,
126 version: u8,
127 body: &[&[u8]],
128 ) -> CmdResult<()>;
129 async fn send_parts_by_specify_tunnel_with_resp(
130 &self,
131 peer_id: &PeerId,
132 tunnel_id: TunnelId,
133 cmd: CMD,
134 version: u8,
135 body: &[&[u8]],
136 timeout: Duration,
137 ) -> CmdResult<CmdBody>;
138 #[deprecated(note = "use send_parts_by_specify_tunnel instead")]
139 async fn send2_by_specify_tunnel(
140 &self,
141 peer_id: &PeerId,
142 tunnel_id: TunnelId,
143 cmd: CMD,
144 version: u8,
145 body: &[&[u8]],
146 ) -> CmdResult<()> {
147 self.send_parts_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
148 .await
149 }
150 #[deprecated(note = "use send_parts_by_specify_tunnel_with_resp instead")]
151 async fn send2_by_specify_tunnel_with_resp(
152 &self,
153 peer_id: &PeerId,
154 tunnel_id: TunnelId,
155 cmd: CMD,
156 version: u8,
157 body: &[&[u8]],
158 timeout: Duration,
159 ) -> CmdResult<CmdBody> {
160 self.send_parts_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
161 .await
162 }
163 async fn send_cmd_by_specify_tunnel(
164 &self,
165 peer_id: &PeerId,
166 tunnel_id: TunnelId,
167 cmd: CMD,
168 version: u8,
169 body: CmdBody,
170 ) -> CmdResult<()>;
171 async fn send_cmd_by_specify_tunnel_with_resp(
172 &self,
173 peer_id: &PeerId,
174 tunnel_id: TunnelId,
175 cmd: CMD,
176 version: u8,
177 body: CmdBody,
178 timeout: Duration,
179 ) -> CmdResult<CmdBody>;
180 async fn clear_all_tunnel(&self);
181 async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<G>;
182}
183
184#[async_trait::async_trait]
185pub trait ClassifiedCmdNode<
186 LEN: RawEncode
187 + for<'a> RawDecode<'a>
188 + Copy
189 + RawFixedBytes
190 + Sync
191 + Send
192 + 'static
193 + FromPrimitive
194 + ToPrimitive,
195 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
196 C: WorkerClassification,
197 M: CmdTunnelMeta,
198 S: CmdSend<M>,
199 G: SendGuard<M, S>,
200>: CmdNode<LEN, CMD, M, S, G>
201{
202 async fn send_by_classified_tunnel(
203 &self,
204 classification: C,
205 cmd: CMD,
206 version: u8,
207 body: &[u8],
208 ) -> CmdResult<()>;
209 async fn send_by_classified_tunnel_with_resp(
210 &self,
211 classification: C,
212 cmd: CMD,
213 version: u8,
214 body: &[u8],
215 timeout: Duration,
216 ) -> CmdResult<CmdBody>;
217 async fn send_parts_by_classified_tunnel(
218 &self,
219 classification: C,
220 cmd: CMD,
221 version: u8,
222 body: &[&[u8]],
223 ) -> CmdResult<()>;
224 async fn send_parts_by_classified_tunnel_with_resp(
225 &self,
226 classification: C,
227 cmd: CMD,
228 version: u8,
229 body: &[&[u8]],
230 timeout: Duration,
231 ) -> CmdResult<CmdBody>;
232 #[deprecated(note = "use send_parts_by_classified_tunnel instead")]
233 async fn send2_by_classified_tunnel(
234 &self,
235 classification: C,
236 cmd: CMD,
237 version: u8,
238 body: &[&[u8]],
239 ) -> CmdResult<()> {
240 self.send_parts_by_classified_tunnel(classification, cmd, version, body)
241 .await
242 }
243 #[deprecated(note = "use send_parts_by_classified_tunnel_with_resp instead")]
244 async fn send2_by_classified_tunnel_with_resp(
245 &self,
246 classification: C,
247 cmd: CMD,
248 version: u8,
249 body: &[&[u8]],
250 timeout: Duration,
251 ) -> CmdResult<CmdBody> {
252 self.send_parts_by_classified_tunnel_with_resp(classification, cmd, version, body, timeout)
253 .await
254 }
255 async fn send_cmd_by_classified_tunnel(
256 &self,
257 classification: C,
258 cmd: CMD,
259 version: u8,
260 body: CmdBody,
261 ) -> CmdResult<()>;
262 async fn send_cmd_by_classified_tunnel_with_resp(
263 &self,
264 classification: C,
265 cmd: CMD,
266 version: u8,
267 body: CmdBody,
268 timeout: Duration,
269 ) -> CmdResult<CmdBody>;
270 async fn send_by_peer_classified_tunnel(
271 &self,
272 peer_id: &PeerId,
273 classification: C,
274 cmd: CMD,
275 version: u8,
276 body: &[u8],
277 ) -> CmdResult<()>;
278 async fn send_by_peer_classified_tunnel_with_resp(
279 &self,
280 peer_id: &PeerId,
281 classification: C,
282 cmd: CMD,
283 version: u8,
284 body: &[u8],
285 timeout: Duration,
286 ) -> CmdResult<CmdBody>;
287 async fn send_parts_by_peer_classified_tunnel(
288 &self,
289 peer_id: &PeerId,
290 classification: C,
291 cmd: CMD,
292 version: u8,
293 body: &[&[u8]],
294 ) -> CmdResult<()>;
295 async fn send_parts_by_peer_classified_tunnel_with_resp(
296 &self,
297 peer_id: &PeerId,
298 classification: C,
299 cmd: CMD,
300 version: u8,
301 body: &[&[u8]],
302 timeout: Duration,
303 ) -> CmdResult<CmdBody>;
304 #[deprecated(note = "use send_parts_by_peer_classified_tunnel instead")]
305 async fn send2_by_peer_classified_tunnel(
306 &self,
307 peer_id: &PeerId,
308 classification: C,
309 cmd: CMD,
310 version: u8,
311 body: &[&[u8]],
312 ) -> CmdResult<()> {
313 self.send_parts_by_peer_classified_tunnel(peer_id, classification, cmd, version, body)
314 .await
315 }
316 #[deprecated(note = "use send_parts_by_peer_classified_tunnel_with_resp instead")]
317 async fn send2_by_peer_classified_tunnel_with_resp(
318 &self,
319 peer_id: &PeerId,
320 classification: C,
321 cmd: CMD,
322 version: u8,
323 body: &[&[u8]],
324 timeout: Duration,
325 ) -> CmdResult<CmdBody> {
326 self.send_parts_by_peer_classified_tunnel_with_resp(
327 peer_id,
328 classification,
329 cmd,
330 version,
331 body,
332 timeout,
333 )
334 .await
335 }
336 async fn send_cmd_by_peer_classified_tunnel(
337 &self,
338 peer_id: &PeerId,
339 classification: C,
340 cmd: CMD,
341 version: u8,
342 body: CmdBody,
343 ) -> CmdResult<()>;
344 async fn send_cmd_by_peer_classified_tunnel_with_resp(
345 &self,
346 peer_id: &PeerId,
347 classification: C,
348 cmd: CMD,
349 version: u8,
350 body: CmdBody,
351 timeout: Duration,
352 ) -> CmdResult<CmdBody>;
353 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
354 async fn find_tunnel_id_by_peer_classified(
355 &self,
356 peer_id: &PeerId,
357 classification: C,
358 ) -> CmdResult<TunnelId>;
359 async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
360 async fn get_send_by_peer_classified(
361 &self,
362 peer_id: &PeerId,
363 classification: C,
364 ) -> CmdResult<G>;
365}
366
367pub(crate) fn create_recv_handle<
368 M: CmdTunnelMeta,
369 R: CmdTunnelRead<M>,
370 W: CmdTunnelWrite<M>,
371 LEN: RawEncode
372 + for<'a> RawDecode<'a>
373 + Copy
374 + Send
375 + Sync
376 + 'static
377 + FromPrimitive
378 + ToPrimitive
379 + RawFixedBytes,
380 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
381>(
382 mut reader: RHalf<R, W>,
383 write: ObjectHolder<WHalf<R, W>>,
384 tunnel_id: TunnelId,
385 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
386) -> JoinHandle<CmdResult<()>> {
387 let recv_handle = tokio::spawn(async move {
388 let ret: CmdResult<()> = async move {
389 let local_id = reader.get_local_peer_id();
390 let remote_id = reader.get_remote_peer_id();
391 loop {
392 log::trace!("tunnel {:?} enter recv proc", tunnel_id);
393 let header_len = reader
394 .read_u8()
395 .await
396 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
397 log::trace!("tunnel {:?} recv cmd len {}", tunnel_id, header_len);
398 let mut header = vec![0u8; header_len as usize];
399 let n = reader
400 .read_exact(&mut header)
401 .await
402 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
403 if n == 0 {
404 break;
405 }
406 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
407 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
408 log::trace!(
409 "tunnel {:?} recv cmd {:?} from {} len {}",
410 tunnel_id,
411 header.cmd_code(),
412 remote_id.to_base36(),
413 header.pkg_len().to_u64().unwrap()
414 );
415 let body_len = header.pkg_len().to_u64().unwrap();
416 let cmd_read =
417 CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
418 let waiter = cmd_read.get_waiter();
419 let future = waiter
420 .create_result_future()
421 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
422 {
423 let version = header.version();
424 let seq = header.seq();
425 let cmd_code = header.cmd_code();
426 let body_read = cmd_read;
427 match cmd_handler
428 .handle(
429 local_id.clone(),
430 remote_id.clone(),
431 tunnel_id,
432 header,
433 CmdBody::from_reader(BufReader::new(body_read), body_len),
434 )
435 .await
436 {
437 Ok(Some(mut body)) => {
438 let mut write = write.get().await;
439 let header = CmdHeader::<LEN, CMD>::new(
440 version,
441 true,
442 seq,
443 cmd_code,
444 LEN::from_u64(body.len()).unwrap(),
445 );
446 let buf = header
447 .to_vec()
448 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
449 if buf.len() > 255 {
450 return Err(cmd_err!(
451 CmdErrorCode::RawCodecError,
452 "header too long"
453 ));
454 }
455 write
456 .write_u8(buf.len() as u8)
457 .await
458 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
459 write
460 .write_all(buf.as_slice())
461 .await
462 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
463 tokio::io::copy(&mut body, write.deref_mut().deref_mut())
464 .await
465 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
466 write
467 .flush()
468 .await
469 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
470 }
471 Err(e) => {
472 log::error!("handle cmd error: {:?}", e);
473 }
474 _ => {}
475 }
476 };
477 reader = future
478 .await
479 .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
480 }
482 Ok(())
483 }
484 .await;
485 ret
486 });
487 recv_handle
488}