1use super::CmdServer;
2use super::peer_manager::{PeerManager, PeerManagerRef};
3use crate::client::{RespWaiter, RespWaiterRef, gen_resp_id, gen_seq};
4use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
5use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
6use crate::peer_connection::PeerConnection;
7use crate::peer_id::PeerId;
8use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
9use async_named_locker::{NamedStateHolder, ObjectHolder};
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_split::Splittable;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::marker::PhantomData;
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
20
21#[async_trait::async_trait]
22pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
23 Send + Sync + 'static
24{
25 async fn accept(&self) -> CmdResult<Splittable<R, W>>;
26}
27
28#[async_trait::async_trait]
29pub trait CmdTunnelService<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
30 Send + Sync + 'static
31{
32 async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()>;
33}
34
35#[async_trait::async_trait]
36pub trait CmdServerEventListener: Send + Sync + 'static {
37 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
38 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
39}
40
41#[derive(Clone)]
42struct CmdServerEventListenerEmit {
43 listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
44}
45
46impl CmdServerEventListenerEmit {
47 pub fn new() -> Self {
48 Self {
49 listeners: Arc::new(Mutex::new(Vec::new())),
50 }
51 }
52
53 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
54 self.listeners.lock().unwrap().push(event_listener);
55 }
56}
57
58#[async_trait::async_trait]
59impl CmdServerEventListener for CmdServerEventListenerEmit {
60 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
61 let listeners = { self.listeners.lock().unwrap().clone() };
62 for listener in listeners.iter() {
63 if let Err(e) = listener.on_peer_connected(peer_id).await {
64 log::error!("on_peer_connected error: {:?}", e);
65 }
66 }
67 Ok(())
68 }
69
70 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
71 let listeners = { self.listeners.lock().unwrap().clone() };
72 for listener in listeners.iter() {
73 if let Err(e) = listener.on_peer_disconnected(peer_id).await {
74 log::error!("on_peer_disconnected error: {:?}", e);
75 }
76 }
77 Ok(())
78 }
79}
80
81pub struct DefaultCmdServerService<
82 M: CmdTunnelMeta,
83 R: CmdTunnelRead<M>,
84 W: CmdTunnelWrite<M>,
85 LEN,
86 CMD,
87> {
88 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
89 peer_manager: PeerManagerRef<M, R, W>,
90 event_emit: CmdServerEventListenerEmit,
91 resp_waiter: RespWaiterRef,
92 state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
93 _p: PhantomData<fn() -> (M, R, W, LEN, CMD)>,
94}
95
96impl<
97 M: CmdTunnelMeta,
98 R: CmdTunnelRead<M>,
99 W: CmdTunnelWrite<M>,
100 LEN: RawEncode
101 + for<'a> RawDecode<'a>
102 + Copy
103 + RawFixedBytes
104 + Sync
105 + Send
106 + 'static
107 + FromPrimitive
108 + ToPrimitive,
109 CMD: RawEncode
110 + for<'a> RawDecode<'a>
111 + Copy
112 + RawFixedBytes
113 + Sync
114 + Send
115 + 'static
116 + Eq
117 + Hash
118 + Debug,
119> DefaultCmdServerService<M, R, W, LEN, CMD>
120{
121 pub fn new() -> Arc<Self> {
122 let event_emit = CmdServerEventListenerEmit::new();
123 Arc::new(Self {
124 cmd_handler_map: Arc::new(CmdHandlerMap::new()),
125 peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
126 event_emit,
127 resp_waiter: Arc::new(RespWaiter::new()),
128 state_holder: NamedStateHolder::new(),
129 _p: PhantomData,
130 })
131 }
132
133 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
134 self.event_emit.attach_event_listener(event_listener);
135 }
136
137 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
138 self.peer_manager.find_connections(peer_id)
139 }
140
141 pub async fn serve_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
142 let peer_id = tunnel.get_remote_peer_id();
143 let tunnel_id = self.peer_manager.generate_conn_id();
144 let cmd_handler_map = self.cmd_handler_map.clone();
145 let resp_waiter = self.resp_waiter.clone();
146 let state_holder = self.state_holder.clone();
147 let (mut reader, writer) = tunnel.split();
148 let local_id = reader.get_local_peer_id();
149 let writer = ObjectHolder::new(writer);
150 let resp_write = writer.clone();
151 let remote_id = peer_id.clone();
152 let recv_handle = tokio::spawn(async move {
153 let ret: CmdResult<()> = async move {
154 loop {
155 let header_len = reader
156 .read_u8()
157 .await
158 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
159 let mut header = vec![0u8; header_len as usize];
160 let n = reader
161 .read_exact(&mut header)
162 .await
163 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
164 if n == 0 {
165 break;
166 }
167 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
168 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
169 sfo_log::debug!("recv cmd {:?}", header.cmd_code());
170 let body_len = header.pkg_len().to_u64().unwrap();
171 let cmd_read = CmdBodyRead::new(reader, body_len as usize);
172 let waiter = cmd_read.get_waiter();
173 let future = waiter
174 .create_result_future()
175 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
176 {
177 let body = CmdBody::from_reader(BufReader::new(cmd_read), body_len);
178 if header.is_resp() && header.seq().is_some() {
179 let resp_id =
180 gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
181 let _ = resp_waiter.set_result(resp_id, body);
182 } else if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
183 let version = header.version();
184 let seq = header.seq();
185 let cmd_code = header.cmd_code();
186 match {
187 let _handle_state = state_holder.new_state(tokio::task::id());
188 handler
189 .handle(
190 local_id.clone(),
191 remote_id.clone(),
192 tunnel_id,
193 header,
194 body,
195 )
196 .await
197 } {
198 Ok(Some(mut body)) => {
199 let mut write = resp_write.get().await;
200 let header = CmdHeader::<LEN, CMD>::new(
201 version,
202 true,
203 seq,
204 cmd_code,
205 LEN::from_u64(body.len()).unwrap(),
206 );
207 let buf = header
208 .to_vec()
209 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
210 if buf.len() > 255 {
211 return Err(cmd_err!(
212 CmdErrorCode::RawCodecError,
213 "header len too large"
214 ));
215 }
216 write
217 .write_u8(buf.len() as u8)
218 .await
219 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
220 write
221 .write_all(buf.as_slice())
222 .await
223 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
224 tokio::io::copy(&mut body, write.deref_mut().deref_mut())
225 .await
226 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
227 write
228 .flush()
229 .await
230 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
231 }
232 Err(e) => {
233 log::error!("handle cmd error: {:?}", e);
234 }
235 Ok(None) => {}
236 }
237 }
238 }
239 reader = future
240 .await
241 .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
242 }
243 Ok(())
244 }
245 .await;
246 if let Err(e) = ret.as_ref() {
247 log::error!("recv cmd error: {:?}", e);
248 }
249 ret
250 });
251
252 let peer_conn = PeerConnection {
253 conn_id: tunnel_id,
254 peer_id,
255 send: writer,
256 handle: Some(recv_handle),
257 };
258 self.peer_manager.add_peer_connection(peer_conn).await;
259 Ok(())
260 }
261}
262
263#[async_trait::async_trait]
264impl<
265 M: CmdTunnelMeta,
266 R: CmdTunnelRead<M>,
267 W: CmdTunnelWrite<M>,
268 LEN: RawEncode
269 + for<'a> RawDecode<'a>
270 + Copy
271 + RawFixedBytes
272 + Sync
273 + Send
274 + 'static
275 + FromPrimitive
276 + ToPrimitive,
277 CMD: RawEncode
278 + for<'a> RawDecode<'a>
279 + Copy
280 + RawFixedBytes
281 + Sync
282 + Send
283 + 'static
284 + Eq
285 + Hash
286 + Debug,
287> CmdTunnelService<M, R, W> for DefaultCmdServerService<M, R, W, LEN, CMD>
288{
289 async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
290 self.serve_tunnel(tunnel).await
291 }
292}
293
294#[async_trait::async_trait]
295impl<
296 M: CmdTunnelMeta,
297 R: CmdTunnelRead<M>,
298 W: CmdTunnelWrite<M>,
299 LEN: RawEncode
300 + for<'a> RawDecode<'a>
301 + Copy
302 + RawFixedBytes
303 + Sync
304 + Send
305 + 'static
306 + FromPrimitive
307 + ToPrimitive,
308 CMD: RawEncode
309 + for<'a> RawDecode<'a>
310 + Copy
311 + RawFixedBytes
312 + Sync
313 + Send
314 + 'static
315 + Eq
316 + Hash
317 + Debug,
318> CmdServer<LEN, CMD> for DefaultCmdServerService<M, R, W, LEN, CMD>
319{
320 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
321 self.cmd_handler_map.insert(cmd, handler);
322 }
323
324 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
325 let connections = self.peer_manager.find_connections(peer_id);
326 for conn in connections {
327 let ret: CmdResult<()> = async move {
328 log::debug!(
329 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
330 peer_id,
331 conn.conn_id,
332 cmd,
333 body.len(),
334 hex::encode(body)
335 );
336 let header = CmdHeader::<LEN, CMD>::new(
337 version,
338 false,
339 None,
340 cmd,
341 LEN::from_u64(body.len() as u64).unwrap(),
342 );
343 let buf = header
344 .to_vec()
345 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
346 let mut send = conn.send.get().await;
347 if buf.len() > 255 {
348 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
349 }
350 send.write_u8(buf.len() as u8)
351 .await
352 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
353 send.write_all(buf.as_slice())
354 .await
355 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
356 send.write_all(body)
357 .await
358 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
359 send.flush()
360 .await
361 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
362 Ok(())
363 }
364 .await;
365 if ret.is_ok() {
366 break;
367 }
368 }
369 Ok(())
370 }
371
372 async fn send_with_resp(
373 &self,
374 peer_id: &PeerId,
375 cmd: CMD,
376 version: u8,
377 body: &[u8],
378 timeout: Duration,
379 ) -> CmdResult<CmdBody> {
380 let connections = self.peer_manager.find_connections(peer_id);
381 for conn in connections {
382 if let Some(id) = tokio::task::try_id() {
383 if self.state_holder.has_state(id) {
384 continue;
385 }
386 }
387 let ret: CmdResult<CmdBody> = async move {
388 log::debug!(
389 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
390 peer_id,
391 conn.conn_id,
392 cmd,
393 body.len(),
394 hex::encode(body)
395 );
396 let seq = gen_seq();
397 let header = CmdHeader::<LEN, CMD>::new(
398 version,
399 false,
400 Some(seq),
401 cmd,
402 LEN::from_u64(body.len() as u64).unwrap(),
403 );
404 let buf = header
405 .to_vec()
406 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
407 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
408 let waiter = self
409 .resp_waiter
410 .create_timeout_result_future(resp_id, timeout)
411 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
412 {
413 let mut send = conn.send.get().await;
414 if buf.len() > 255 {
415 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
416 }
417 send.write_u8(buf.len() as u8)
418 .await
419 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
420 send.write_all(buf.as_slice())
421 .await
422 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
423 send.write_all(body)
424 .await
425 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
426 send.flush()
427 .await
428 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
429 }
430 let body =
431 waiter
432 .await
433 .map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
434 Ok(body)
435 }
436 .await;
437 if ret.is_ok() {
438 return ret;
439 } else {
440 sfo_log::error!("send err {:?}", ret.unwrap_err());
441 }
442 }
443 Err(cmd_err!(
444 CmdErrorCode::Failed,
445 "send to peer_id: {}",
446 peer_id
447 ))
448 }
449
450 async fn send2(
451 &self,
452 peer_id: &PeerId,
453 cmd: CMD,
454 version: u8,
455 body: &[&[u8]],
456 ) -> CmdResult<()> {
457 let connections = self.peer_manager.find_connections(peer_id);
458 for conn in connections {
459 let ret: CmdResult<()> = async move {
460 let mut len = 0;
461 for b in body.iter() {
462 len += b.len();
463 log::debug!(
464 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
465 peer_id,
466 conn.conn_id,
467 cmd,
468 hex::encode(b)
469 );
470 }
471 log::debug!(
472 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
473 peer_id,
474 conn.conn_id,
475 cmd,
476 len
477 );
478 let header = CmdHeader::<LEN, CMD>::new(
479 version,
480 false,
481 None,
482 cmd,
483 LEN::from_u64(len as u64).unwrap(),
484 );
485 let buf = header
486 .to_vec()
487 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
488 let mut send = conn.send.get().await;
489 if buf.len() > 255 {
490 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
491 }
492 send.write_u8(buf.len() as u8)
493 .await
494 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
495 send.write_all(buf.as_slice())
496 .await
497 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
498 for b in body.iter() {
499 send.write_all(b)
500 .await
501 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
502 }
503 send.flush()
504 .await
505 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
506 Ok(())
507 }
508 .await;
509 if ret.is_ok() {
510 break;
511 }
512 }
513 Ok(())
514 }
515
516 async fn send2_with_resp(
517 &self,
518 peer_id: &PeerId,
519 cmd: CMD,
520 version: u8,
521 body: &[&[u8]],
522 timeout: Duration,
523 ) -> CmdResult<CmdBody> {
524 let connections = self.peer_manager.find_connections(peer_id);
525 for conn in connections {
526 if let Some(id) = tokio::task::try_id() {
527 if self.state_holder.has_state(id) {
528 continue;
529 }
530 }
531 let ret: CmdResult<CmdBody> = async move {
532 let mut len = 0;
533 for b in body.iter() {
534 len += b.len();
535 log::debug!(
536 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
537 peer_id,
538 conn.conn_id,
539 cmd,
540 hex::encode(b)
541 );
542 }
543 log::debug!(
544 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
545 peer_id,
546 conn.conn_id,
547 cmd,
548 len
549 );
550 let seq = gen_seq();
551 let header = CmdHeader::<LEN, CMD>::new(
552 version,
553 false,
554 Some(seq),
555 cmd,
556 LEN::from_u64(len as u64).unwrap(),
557 );
558 let buf = header
559 .to_vec()
560 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
561 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
562 let waiter = self
563 .resp_waiter
564 .create_timeout_result_future(resp_id, timeout)
565 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
566 {
567 let mut send = conn.send.get().await;
568 if buf.len() > 255 {
569 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
570 }
571 send.write_u8(buf.len() as u8)
572 .await
573 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
574 send.write_all(buf.as_slice())
575 .await
576 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
577 for b in body.iter() {
578 send.write_all(b)
579 .await
580 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
581 }
582 send.flush()
583 .await
584 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
585 }
586 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
587 Ok(body)
588 }
589 .await;
590 if ret.is_ok() {
591 return ret;
592 }
593 }
594 Err(cmd_err!(
595 CmdErrorCode::Failed,
596 "send to peer_id: {}",
597 peer_id
598 ))
599 }
600
601 async fn send_cmd(
602 &self,
603 peer_id: &PeerId,
604 cmd: CMD,
605 version: u8,
606 body: CmdBody,
607 ) -> CmdResult<()> {
608 let body_data = body.into_bytes().await?;
609 let body = body_data.as_slice();
610 let connections = self.peer_manager.find_connections(peer_id);
611 for conn in connections {
612 let ret: CmdResult<()> = async move {
613 log::debug!(
614 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
615 peer_id,
616 conn.conn_id,
617 cmd,
618 body.len(),
619 hex::encode(body)
620 );
621 let header = CmdHeader::<LEN, CMD>::new(
622 version,
623 false,
624 None,
625 cmd,
626 LEN::from_u64(body.len() as u64).unwrap(),
627 );
628 let buf = header
629 .to_vec()
630 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
631 let mut send = conn.send.get().await;
632 if buf.len() > 255 {
633 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
634 }
635 send.write_u8(buf.len() as u8)
636 .await
637 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
638 send.write_all(buf.as_slice())
639 .await
640 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
641 send.write_all(body)
642 .await
643 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
644 send.flush()
645 .await
646 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
647 Ok(())
648 }
649 .await;
650 if ret.is_ok() {
651 break;
652 }
653 }
654 Ok(())
655 }
656
657 async fn send_cmd_with_resp(
658 &self,
659 peer_id: &PeerId,
660 cmd: CMD,
661 version: u8,
662 body: CmdBody,
663 timeout: Duration,
664 ) -> CmdResult<CmdBody> {
665 let connections = self.peer_manager.find_connections(peer_id);
666 let body_data = body.into_bytes().await?;
667 let data_ref = body_data.as_slice();
668 for conn in connections {
669 if let Some(id) = tokio::task::try_id() {
670 if self.state_holder.has_state(id) {
671 continue;
672 }
673 }
674 let ret: CmdResult<CmdBody> = async move {
675 log::debug!(
676 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}",
677 peer_id,
678 conn.conn_id,
679 cmd,
680 data_ref.len()
681 );
682 let seq = gen_seq();
683 let header = CmdHeader::<LEN, CMD>::new(
684 version,
685 false,
686 Some(seq),
687 cmd,
688 LEN::from_u64(data_ref.len() as u64).unwrap(),
689 );
690 let buf = header
691 .to_vec()
692 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
693 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
694 let waiter = self
695 .resp_waiter
696 .create_timeout_result_future(resp_id, timeout)
697 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
698 {
699 let mut send = conn.send.get().await;
700 if buf.len() > 255 {
701 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
702 }
703 send.write_u8(buf.len() as u8)
704 .await
705 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
706 send.write_all(buf.as_slice())
707 .await
708 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
709 send.write_all(data_ref)
710 .await
711 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
712 send.flush()
713 .await
714 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
715 }
716 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
717 Ok(body)
718 }
719 .await;
720 if ret.is_ok() {
721 return ret;
722 }
723 }
724 Err(cmd_err!(
725 CmdErrorCode::Failed,
726 "send to peer_id: {}",
727 peer_id
728 ))
729 }
730
731 async fn send_by_specify_tunnel(
732 &self,
733 peer_id: &PeerId,
734 tunnel_id: TunnelId,
735 cmd: CMD,
736 version: u8,
737 body: &[u8],
738 ) -> CmdResult<()> {
739 let conn = self.peer_manager.find_connection(tunnel_id);
740 if conn.is_none() {
741 return Err(cmd_err!(
742 CmdErrorCode::PeerConnectionNotFound,
743 "tunnel_id: {:?}",
744 tunnel_id
745 ));
746 }
747 let conn = conn.unwrap();
748 assert_eq!(tunnel_id, conn.conn_id);
749 log::trace!(
750 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
751 peer_id,
752 conn.conn_id,
753 cmd,
754 body.len(),
755 hex::encode(body)
756 );
757 let header = CmdHeader::<LEN, CMD>::new(
758 version,
759 false,
760 None,
761 cmd,
762 LEN::from_u64(body.len() as u64).unwrap(),
763 );
764 let buf = header
765 .to_vec()
766 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
767 let mut send = conn.send.get().await;
768 if buf.len() > 255 {
769 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
770 }
771 send.write_u8(buf.len() as u8)
772 .await
773 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
774 send.write_all(buf.as_slice())
775 .await
776 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
777 send.write_all(body)
778 .await
779 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
780 send.flush()
781 .await
782 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
783 Ok(())
784 }
785
786 async fn send_by_specify_tunnel_with_resp(
787 &self,
788 peer_id: &PeerId,
789 tunnel_id: TunnelId,
790 cmd: CMD,
791 version: u8,
792 body: &[u8],
793 timeout: Duration,
794 ) -> CmdResult<CmdBody> {
795 let conn = self.peer_manager.find_connection(tunnel_id);
796 if conn.is_none() {
797 return Err(cmd_err!(
798 CmdErrorCode::PeerConnectionNotFound,
799 "tunnel_id: {:?}",
800 tunnel_id
801 ));
802 }
803 let conn = conn.unwrap();
804 if let Some(id) = tokio::task::try_id() {
805 if self.state_holder.has_state(id) {
806 return Err(cmd_err!(
807 CmdErrorCode::Failed,
808 "can't send msg with resp in tunnel {:?} msg handle",
809 conn.conn_id
810 ));
811 }
812 }
813 assert_eq!(tunnel_id, conn.conn_id);
814 log::trace!(
815 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
816 peer_id,
817 conn.conn_id,
818 cmd,
819 body.len(),
820 hex::encode(body)
821 );
822 let seq = gen_seq();
823 let header = CmdHeader::<LEN, CMD>::new(
824 version,
825 false,
826 Some(seq),
827 cmd,
828 LEN::from_u64(body.len() as u64).unwrap(),
829 );
830 let buf = header
831 .to_vec()
832 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
833 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
834 let waiter = self
835 .resp_waiter
836 .create_timeout_result_future(resp_id, timeout)
837 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
838 {
839 let mut send = conn.send.get().await;
840 if buf.len() > 255 {
841 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
842 }
843 send.write_u8(buf.len() as u8)
844 .await
845 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
846 send.write_all(buf.as_slice())
847 .await
848 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
849 send.write_all(body)
850 .await
851 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
852 send.flush()
853 .await
854 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
855 }
856 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
857 Ok(body)
858 }
859
860 async fn send2_by_specify_tunnel(
861 &self,
862 peer_id: &PeerId,
863 tunnel_id: TunnelId,
864 cmd: CMD,
865 version: u8,
866 body: &[&[u8]],
867 ) -> CmdResult<()> {
868 let conn = self.peer_manager.find_connection(tunnel_id);
869 if conn.is_none() {
870 return Err(cmd_err!(
871 CmdErrorCode::PeerConnectionNotFound,
872 "tunnel_id: {:?}",
873 tunnel_id
874 ));
875 }
876 let conn = conn.unwrap();
877 assert_eq!(tunnel_id, conn.conn_id);
878 let mut len = 0;
879 for b in body.iter() {
880 len += b.len();
881 log::debug!(
882 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
883 peer_id,
884 conn.conn_id,
885 cmd,
886 hex::encode(b)
887 );
888 }
889 log::debug!(
890 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
891 peer_id,
892 conn.conn_id,
893 cmd,
894 len
895 );
896 let header = CmdHeader::<LEN, CMD>::new(
897 version,
898 false,
899 None,
900 cmd,
901 LEN::from_u64(len as u64).unwrap(),
902 );
903 let buf = header
904 .to_vec()
905 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
906 if buf.len() > 255 {
907 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
908 }
909 let mut send = conn.send.get().await;
910 send.write_u8(buf.len() as u8)
911 .await
912 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
913 send.write_all(buf.as_slice())
914 .await
915 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
916 for b in body.iter() {
917 send.write_all(b)
918 .await
919 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
920 }
921 send.flush()
922 .await
923 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
924 Ok(())
925 }
926
927 async fn send2_by_specify_tunnel_with_resp(
928 &self,
929 peer_id: &PeerId,
930 tunnel_id: TunnelId,
931 cmd: CMD,
932 version: u8,
933 body: &[&[u8]],
934 timeout: Duration,
935 ) -> CmdResult<CmdBody> {
936 let conn = self.peer_manager.find_connection(tunnel_id);
937 if conn.is_none() {
938 return Err(cmd_err!(
939 CmdErrorCode::PeerConnectionNotFound,
940 "tunnel_id: {:?}",
941 tunnel_id
942 ));
943 }
944 let conn = conn.unwrap();
945 if let Some(id) = tokio::task::try_id() {
946 if self.state_holder.has_state(id) {
947 return Err(cmd_err!(
948 CmdErrorCode::Failed,
949 "can't send msg with resp in tunnel {:?} msg handle",
950 conn.conn_id
951 ));
952 }
953 }
954 assert_eq!(tunnel_id, conn.conn_id);
955 let mut len = 0;
956 for b in body.iter() {
957 len += b.len();
958 log::debug!(
959 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
960 peer_id,
961 conn.conn_id,
962 cmd,
963 hex::encode(b)
964 );
965 }
966 log::debug!(
967 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
968 peer_id,
969 conn.conn_id,
970 cmd,
971 len
972 );
973 let seq = gen_seq();
974 let header = CmdHeader::<LEN, CMD>::new(
975 version,
976 false,
977 Some(seq),
978 cmd,
979 LEN::from_u64(len as u64).unwrap(),
980 );
981 let buf = header
982 .to_vec()
983 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
984 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
985 let waiter = self
986 .resp_waiter
987 .create_timeout_result_future(resp_id, timeout)
988 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
989 if buf.len() > 255 {
990 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
991 }
992 {
993 let mut send = conn.send.get().await;
994 send.write_u8(buf.len() as u8)
995 .await
996 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
997 send.write_all(buf.as_slice())
998 .await
999 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1000 for b in body.iter() {
1001 send.write_all(b)
1002 .await
1003 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1004 }
1005 send.flush()
1006 .await
1007 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1008 }
1009 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1010 Ok(body)
1011 }
1012
1013 async fn send_cmd_by_specify_tunnel(
1014 &self,
1015 peer_id: &PeerId,
1016 tunnel_id: TunnelId,
1017 cmd: CMD,
1018 version: u8,
1019 mut body: CmdBody,
1020 ) -> CmdResult<()> {
1021 let conn = self.peer_manager.find_connection(tunnel_id);
1022 if conn.is_none() {
1023 return Err(cmd_err!(
1024 CmdErrorCode::PeerConnectionNotFound,
1025 "tunnel_id: {:?}",
1026 tunnel_id
1027 ));
1028 }
1029 let conn = conn.unwrap();
1030 assert_eq!(tunnel_id, conn.conn_id);
1031 log::debug!(
1032 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1033 peer_id,
1034 conn.conn_id,
1035 cmd,
1036 body.len()
1037 );
1038 let header = CmdHeader::<LEN, CMD>::new(
1039 version,
1040 false,
1041 None,
1042 cmd,
1043 LEN::from_u64(body.len()).unwrap(),
1044 );
1045 let buf = header
1046 .to_vec()
1047 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1048 let mut send = conn.send.get().await;
1049 if buf.len() > 255 {
1050 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1051 }
1052 send.write_u8(buf.len() as u8)
1053 .await
1054 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1055 send.write_all(buf.as_slice())
1056 .await
1057 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1058 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1059 .await
1060 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1061 send.flush()
1062 .await
1063 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1064 Ok(())
1065 }
1066
1067 async fn send_cmd_by_specify_tunnel_with_resp(
1068 &self,
1069 peer_id: &PeerId,
1070 tunnel_id: TunnelId,
1071 cmd: CMD,
1072 version: u8,
1073 mut body: CmdBody,
1074 timeout: Duration,
1075 ) -> CmdResult<CmdBody> {
1076 let conn = self.peer_manager.find_connection(tunnel_id);
1077 if conn.is_none() {
1078 return Err(cmd_err!(
1079 CmdErrorCode::PeerConnectionNotFound,
1080 "tunnel_id: {:?}",
1081 tunnel_id
1082 ));
1083 }
1084 let conn = conn.unwrap();
1085 if let Some(id) = tokio::task::try_id() {
1086 if self.state_holder.has_state(id) {
1087 return Err(cmd_err!(
1088 CmdErrorCode::Failed,
1089 "can't send msg with resp in tunnel {:?} msg handle",
1090 conn.conn_id
1091 ));
1092 }
1093 }
1094 assert_eq!(tunnel_id, conn.conn_id);
1095 log::debug!(
1096 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1097 peer_id,
1098 conn.conn_id,
1099 cmd,
1100 body.len()
1101 );
1102 let seq = gen_seq();
1103 let header = CmdHeader::<LEN, CMD>::new(
1104 version,
1105 false,
1106 Some(seq),
1107 cmd,
1108 LEN::from_u64(body.len()).unwrap(),
1109 );
1110 let buf = header
1111 .to_vec()
1112 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1113 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1114 let waiter = self
1115 .resp_waiter
1116 .create_timeout_result_future(resp_id, timeout)
1117 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1118 {
1119 let mut send = conn.send.get().await;
1120 if buf.len() > 255 {
1121 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1122 }
1123 send.write_u8(buf.len() as u8)
1124 .await
1125 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1126 send.write_all(buf.as_slice())
1127 .await
1128 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1129 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1130 .await
1131 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1132 send.flush()
1133 .await
1134 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1135 }
1136 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1137 Ok(body)
1138 }
1139
1140 async fn send_by_all_tunnels(
1141 &self,
1142 peer_id: &PeerId,
1143 cmd: CMD,
1144 version: u8,
1145 body: &[u8],
1146 ) -> CmdResult<()> {
1147 let connections = self.peer_manager.find_connections(peer_id);
1148 for conn in connections {
1149 let _ret: CmdResult<()> = async move {
1150 let header = CmdHeader::<LEN, CMD>::new(
1151 version,
1152 false,
1153 None,
1154 cmd,
1155 LEN::from_u64(body.len() as u64).unwrap(),
1156 );
1157 let buf = header
1158 .to_vec()
1159 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1160 let mut send = conn.send.get().await;
1161 send.write_u8(buf.len() as u8)
1162 .await
1163 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1164 send.write_all(buf.as_slice())
1165 .await
1166 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1167 send.write_all(body)
1168 .await
1169 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1170 send.flush()
1171 .await
1172 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1173 Ok(())
1174 }
1175 .await;
1176 }
1177 Ok(())
1178 }
1179
1180 async fn send2_by_all_tunnels(
1181 &self,
1182 peer_id: &PeerId,
1183 cmd: CMD,
1184 version: u8,
1185 body: &[&[u8]],
1186 ) -> CmdResult<()> {
1187 let connections = self.peer_manager.find_connections(peer_id);
1188 let mut len = 0;
1189 for b in body.iter() {
1190 len += b.len();
1191 }
1192 for conn in connections {
1193 let _ret: CmdResult<()> = async move {
1194 let header = CmdHeader::<LEN, CMD>::new(
1195 version,
1196 false,
1197 None,
1198 cmd,
1199 LEN::from_u64(len as u64).unwrap(),
1200 );
1201 let buf = header
1202 .to_vec()
1203 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1204 if buf.len() > 255 {
1205 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1206 }
1207 let mut send = conn.send.get().await;
1208 send.write_u8(buf.len() as u8)
1209 .await
1210 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1211 send.write_all(buf.as_slice())
1212 .await
1213 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1214 for b in body.iter() {
1215 send.write_all(b)
1216 .await
1217 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1218 }
1219 send.flush()
1220 .await
1221 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1222 Ok(())
1223 }
1224 .await;
1225 }
1226 Ok(())
1227 }
1228}
1229
1230pub struct DefaultCmdServerIncoming<
1231 M: CmdTunnelMeta,
1232 R: CmdTunnelRead<M>,
1233 W: CmdTunnelWrite<M>,
1234 LISTENER,
1235> {
1236 tunnel_listener: LISTENER,
1237 tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1238 _p: PhantomData<fn() -> (M, R, W)>,
1239}
1240
1241impl<
1242 M: CmdTunnelMeta,
1243 R: CmdTunnelRead<M>,
1244 W: CmdTunnelWrite<M>,
1245 LISTENER: CmdTunnelListener<M, R, W>,
1246> DefaultCmdServerIncoming<M, R, W, LISTENER>
1247{
1248 pub fn new(
1249 tunnel_listener: LISTENER,
1250 tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1251 ) -> Arc<Self> {
1252 Arc::new(Self {
1253 tunnel_listener,
1254 tunnel_service,
1255 _p: PhantomData,
1256 })
1257 }
1258
1259 pub fn start(self: &Arc<Self>) {
1260 let this = self.clone();
1261 tokio::spawn(async move {
1262 if let Err(e) = this.run().await {
1263 log::error!("cmd server error: {:?}", e);
1264 }
1265 });
1266 }
1267
1268 pub async fn run(&self) -> CmdResult<()> {
1269 loop {
1270 let tunnel = self.tunnel_listener.accept().await?;
1271 let tunnel_service = self.tunnel_service.clone();
1272 tokio::spawn(async move {
1273 if let Err(e) = tunnel_service.handle_tunnel(tunnel).await {
1274 log::error!("peer connection error: {:?}", e);
1275 }
1276 });
1277 }
1278 }
1279}
1280
1281pub struct DefaultCmdServer<
1282 M: CmdTunnelMeta,
1283 R: CmdTunnelRead<M>,
1284 W: CmdTunnelWrite<M>,
1285 LEN,
1286 CMD,
1287 LISTENER,
1288> {
1289 incoming: Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>>,
1290 service: Arc<DefaultCmdServerService<M, R, W, LEN, CMD>>,
1291}
1292
1293impl<
1294 M: CmdTunnelMeta,
1295 R: CmdTunnelRead<M>,
1296 W: CmdTunnelWrite<M>,
1297 LEN: RawEncode
1298 + for<'a> RawDecode<'a>
1299 + Copy
1300 + RawFixedBytes
1301 + Sync
1302 + Send
1303 + 'static
1304 + FromPrimitive
1305 + ToPrimitive,
1306 CMD: RawEncode
1307 + for<'a> RawDecode<'a>
1308 + Copy
1309 + RawFixedBytes
1310 + Sync
1311 + Send
1312 + 'static
1313 + Eq
1314 + Hash
1315 + Debug,
1316 LISTENER: CmdTunnelListener<M, R, W>,
1317> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1318{
1319 pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
1320 let service = DefaultCmdServerService::new();
1321 let incoming = DefaultCmdServerIncoming::new(tunnel_listener, service.clone());
1322 Arc::new(Self { incoming, service })
1323 }
1324
1325 pub fn incoming(&self) -> Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>> {
1326 self.incoming.clone()
1327 }
1328
1329 pub fn service(&self) -> Arc<DefaultCmdServerService<M, R, W, LEN, CMD>> {
1330 self.service.clone()
1331 }
1332
1333 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
1334 self.service.attach_event_listener(event_listener);
1335 }
1336
1337 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
1338 self.service.get_peer_tunnels(peer_id).await
1339 }
1340
1341 pub fn start(self: &Arc<Self>) {
1342 self.incoming.start();
1343 }
1344}
1345
1346impl<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> Deref
1347 for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1348{
1349 type Target = DefaultCmdServerService<M, R, W, LEN, CMD>;
1350
1351 fn deref(&self) -> &Self::Target {
1352 self.service.as_ref()
1353 }
1354}
1355
1356#[async_trait::async_trait]
1357impl<
1358 M: CmdTunnelMeta,
1359 R: CmdTunnelRead<M>,
1360 W: CmdTunnelWrite<M>,
1361 LEN: RawEncode
1362 + for<'a> RawDecode<'a>
1363 + Copy
1364 + RawFixedBytes
1365 + Sync
1366 + Send
1367 + 'static
1368 + FromPrimitive
1369 + ToPrimitive,
1370 CMD: RawEncode
1371 + for<'a> RawDecode<'a>
1372 + Copy
1373 + RawFixedBytes
1374 + Sync
1375 + Send
1376 + 'static
1377 + Eq
1378 + Hash
1379 + Debug,
1380 LISTENER: CmdTunnelListener<M, R, W>,
1381> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1382{
1383 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
1384 self.service.register_cmd_handler(cmd, handler);
1385 }
1386
1387 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
1388 self.service.send(peer_id, cmd, version, body).await
1389 }
1390
1391 async fn send_with_resp(
1392 &self,
1393 peer_id: &PeerId,
1394 cmd: CMD,
1395 version: u8,
1396 body: &[u8],
1397 timeout: Duration,
1398 ) -> CmdResult<CmdBody> {
1399 self.service
1400 .send_with_resp(peer_id, cmd, version, body, timeout)
1401 .await
1402 }
1403
1404 async fn send2(
1405 &self,
1406 peer_id: &PeerId,
1407 cmd: CMD,
1408 version: u8,
1409 body: &[&[u8]],
1410 ) -> CmdResult<()> {
1411 self.service.send2(peer_id, cmd, version, body).await
1412 }
1413
1414 async fn send2_with_resp(
1415 &self,
1416 peer_id: &PeerId,
1417 cmd: CMD,
1418 version: u8,
1419 body: &[&[u8]],
1420 timeout: Duration,
1421 ) -> CmdResult<CmdBody> {
1422 self.service
1423 .send2_with_resp(peer_id, cmd, version, body, timeout)
1424 .await
1425 }
1426
1427 async fn send_cmd(
1428 &self,
1429 peer_id: &PeerId,
1430 cmd: CMD,
1431 version: u8,
1432 body: CmdBody,
1433 ) -> CmdResult<()> {
1434 self.service.send_cmd(peer_id, cmd, version, body).await
1435 }
1436
1437 async fn send_cmd_with_resp(
1438 &self,
1439 peer_id: &PeerId,
1440 cmd: CMD,
1441 version: u8,
1442 body: CmdBody,
1443 timeout: Duration,
1444 ) -> CmdResult<CmdBody> {
1445 self.service
1446 .send_cmd_with_resp(peer_id, cmd, version, body, timeout)
1447 .await
1448 }
1449
1450 async fn send_by_specify_tunnel(
1451 &self,
1452 peer_id: &PeerId,
1453 tunnel_id: TunnelId,
1454 cmd: CMD,
1455 version: u8,
1456 body: &[u8],
1457 ) -> CmdResult<()> {
1458 self.service
1459 .send_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1460 .await
1461 }
1462
1463 async fn send_by_specify_tunnel_with_resp(
1464 &self,
1465 peer_id: &PeerId,
1466 tunnel_id: TunnelId,
1467 cmd: CMD,
1468 version: u8,
1469 body: &[u8],
1470 timeout: Duration,
1471 ) -> CmdResult<CmdBody> {
1472 self.service
1473 .send_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1474 .await
1475 }
1476
1477 async fn send2_by_specify_tunnel(
1478 &self,
1479 peer_id: &PeerId,
1480 tunnel_id: TunnelId,
1481 cmd: CMD,
1482 version: u8,
1483 body: &[&[u8]],
1484 ) -> CmdResult<()> {
1485 self.service
1486 .send2_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1487 .await
1488 }
1489
1490 async fn send2_by_specify_tunnel_with_resp(
1491 &self,
1492 peer_id: &PeerId,
1493 tunnel_id: TunnelId,
1494 cmd: CMD,
1495 version: u8,
1496 body: &[&[u8]],
1497 timeout: Duration,
1498 ) -> CmdResult<CmdBody> {
1499 self.service
1500 .send2_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1501 .await
1502 }
1503
1504 async fn send_cmd_by_specify_tunnel(
1505 &self,
1506 peer_id: &PeerId,
1507 tunnel_id: TunnelId,
1508 cmd: CMD,
1509 version: u8,
1510 body: CmdBody,
1511 ) -> CmdResult<()> {
1512 self.service
1513 .send_cmd_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1514 .await
1515 }
1516
1517 async fn send_cmd_by_specify_tunnel_with_resp(
1518 &self,
1519 peer_id: &PeerId,
1520 tunnel_id: TunnelId,
1521 cmd: CMD,
1522 version: u8,
1523 body: CmdBody,
1524 timeout: Duration,
1525 ) -> CmdResult<CmdBody> {
1526 self.service
1527 .send_cmd_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1528 .await
1529 }
1530
1531 async fn send_by_all_tunnels(
1532 &self,
1533 peer_id: &PeerId,
1534 cmd: CMD,
1535 version: u8,
1536 body: &[u8],
1537 ) -> CmdResult<()> {
1538 self.service
1539 .send_by_all_tunnels(peer_id, cmd, version, body)
1540 .await
1541 }
1542
1543 async fn send2_by_all_tunnels(
1544 &self,
1545 peer_id: &PeerId,
1546 cmd: CMD,
1547 version: u8,
1548 body: &[&[u8]],
1549 ) -> CmdResult<()> {
1550 self.service
1551 .send2_by_all_tunnels(peer_id, cmd, version, body)
1552 .await
1553 }
1554}