1use super::CmdServer;
2use super::peer_manager::{PeerManager, PeerManagerRef};
3use crate::client::{RespWaiter, RespWaiterRef, gen_resp_id, gen_seq};
4use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
5use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
6use crate::peer_connection::PeerConnection;
7use crate::peer_id::PeerId;
8use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
9use async_named_locker::{NamedStateHolder, ObjectHolder};
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_split::Splittable;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::marker::PhantomData;
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
20
21#[async_trait::async_trait]
22pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
23 Send + Sync + 'static
24{
25 async fn accept(&self) -> CmdResult<Splittable<R, W>>;
26}
27
28#[async_trait::async_trait]
29pub trait CmdTunnelService<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
30 Send + Sync + 'static
31{
32 async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()>;
33}
34
35#[async_trait::async_trait]
36pub trait CmdServerEventListener: Send + Sync + 'static {
37 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
38 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
39}
40
41#[derive(Clone)]
42struct CmdServerEventListenerEmit {
43 listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
44}
45
46impl CmdServerEventListenerEmit {
47 pub fn new() -> Self {
48 Self {
49 listeners: Arc::new(Mutex::new(Vec::new())),
50 }
51 }
52
53 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
54 self.listeners.lock().unwrap().push(event_listener);
55 }
56}
57
58#[async_trait::async_trait]
59impl CmdServerEventListener for CmdServerEventListenerEmit {
60 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
61 let listeners = { self.listeners.lock().unwrap().clone() };
62 for listener in listeners.iter() {
63 if let Err(e) = listener.on_peer_connected(peer_id).await {
64 log::error!("on_peer_connected error: {:?}", e);
65 }
66 }
67 Ok(())
68 }
69
70 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
71 let listeners = { self.listeners.lock().unwrap().clone() };
72 for listener in listeners.iter() {
73 if let Err(e) = listener.on_peer_disconnected(peer_id).await {
74 log::error!("on_peer_disconnected error: {:?}", e);
75 }
76 }
77 Ok(())
78 }
79}
80
81pub struct DefaultCmdServerService<
82 M: CmdTunnelMeta,
83 R: CmdTunnelRead<M>,
84 W: CmdTunnelWrite<M>,
85 LEN,
86 CMD,
87> {
88 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
89 peer_manager: PeerManagerRef<M, R, W>,
90 event_emit: CmdServerEventListenerEmit,
91 resp_waiter: RespWaiterRef,
92 state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
93 _p: PhantomData<fn() -> (M, R, W, LEN, CMD)>,
94}
95
96impl<
97 M: CmdTunnelMeta,
98 R: CmdTunnelRead<M>,
99 W: CmdTunnelWrite<M>,
100 LEN: RawEncode
101 + for<'a> RawDecode<'a>
102 + Copy
103 + RawFixedBytes
104 + Sync
105 + Send
106 + 'static
107 + FromPrimitive
108 + ToPrimitive,
109 CMD: RawEncode
110 + for<'a> RawDecode<'a>
111 + Copy
112 + RawFixedBytes
113 + Sync
114 + Send
115 + 'static
116 + Eq
117 + Hash
118 + Debug,
119> DefaultCmdServerService<M, R, W, LEN, CMD>
120{
121 pub fn new() -> Arc<Self> {
122 let event_emit = CmdServerEventListenerEmit::new();
123 Arc::new(Self {
124 cmd_handler_map: Arc::new(CmdHandlerMap::new()),
125 peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
126 event_emit,
127 resp_waiter: Arc::new(RespWaiter::new()),
128 state_holder: NamedStateHolder::new(),
129 _p: PhantomData,
130 })
131 }
132
133 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
134 self.event_emit.attach_event_listener(event_listener);
135 }
136
137 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
138 self.peer_manager.find_connections(peer_id)
139 }
140
141 pub async fn serve_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
142 let peer_id = tunnel.get_remote_peer_id();
143 let tunnel_id = self.peer_manager.generate_conn_id();
144 let cmd_handler_map = self.cmd_handler_map.clone();
145 let resp_waiter = self.resp_waiter.clone();
146 let state_holder = self.state_holder.clone();
147 let (mut reader, writer) = tunnel.split();
148 let writer = ObjectHolder::new(writer);
149 let resp_write = writer.clone();
150 let remote_id = peer_id.clone();
151 let recv_handle = tokio::spawn(async move {
152 let ret: CmdResult<()> = async move {
153 loop {
154 let header_len = reader
155 .read_u8()
156 .await
157 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
158 let mut header = vec![0u8; header_len as usize];
159 let n = reader
160 .read_exact(&mut header)
161 .await
162 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
163 if n == 0 {
164 break;
165 }
166 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
167 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
168 sfo_log::debug!("recv cmd {:?}", header.cmd_code());
169 let body_len = header.pkg_len().to_u64().unwrap();
170 let cmd_read = CmdBodyRead::new(reader, body_len as usize);
171 let waiter = cmd_read.get_waiter();
172 let future = waiter
173 .create_result_future()
174 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
175 {
176 let body = CmdBody::from_reader(BufReader::new(cmd_read), body_len);
177 if header.is_resp() && header.seq().is_some() {
178 let resp_id =
179 gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
180 let _ = resp_waiter.set_result(resp_id, body);
181 } else if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
182 let version = header.version();
183 let seq = header.seq();
184 let cmd_code = header.cmd_code();
185 match {
186 let _handle_state = state_holder.new_state(tokio::task::id());
187 handler
188 .handle(remote_id.clone(), tunnel_id, header, body)
189 .await
190 } {
191 Ok(Some(mut body)) => {
192 let mut write = resp_write.get().await;
193 let header = CmdHeader::<LEN, CMD>::new(
194 version,
195 true,
196 seq,
197 cmd_code,
198 LEN::from_u64(body.len()).unwrap(),
199 );
200 let buf = header
201 .to_vec()
202 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
203 if buf.len() > 255 {
204 return Err(cmd_err!(
205 CmdErrorCode::RawCodecError,
206 "header len too large"
207 ));
208 }
209 write
210 .write_u8(buf.len() as u8)
211 .await
212 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
213 write
214 .write_all(buf.as_slice())
215 .await
216 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
217 tokio::io::copy(&mut body, write.deref_mut().deref_mut())
218 .await
219 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
220 write
221 .flush()
222 .await
223 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
224 }
225 Err(e) => {
226 log::error!("handle cmd error: {:?}", e);
227 }
228 Ok(None) => {}
229 }
230 }
231 }
232 reader = future
233 .await
234 .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
235 }
236 Ok(())
237 }
238 .await;
239 if let Err(e) = ret.as_ref() {
240 log::error!("recv cmd error: {:?}", e);
241 }
242 ret
243 });
244
245 let peer_conn = PeerConnection {
246 conn_id: tunnel_id,
247 peer_id,
248 send: writer,
249 handle: Some(recv_handle),
250 };
251 self.peer_manager.add_peer_connection(peer_conn).await;
252 Ok(())
253 }
254}
255
256#[async_trait::async_trait]
257impl<
258 M: CmdTunnelMeta,
259 R: CmdTunnelRead<M>,
260 W: CmdTunnelWrite<M>,
261 LEN: RawEncode
262 + for<'a> RawDecode<'a>
263 + Copy
264 + RawFixedBytes
265 + Sync
266 + Send
267 + 'static
268 + FromPrimitive
269 + ToPrimitive,
270 CMD: RawEncode
271 + for<'a> RawDecode<'a>
272 + Copy
273 + RawFixedBytes
274 + Sync
275 + Send
276 + 'static
277 + Eq
278 + Hash
279 + Debug,
280> CmdTunnelService<M, R, W> for DefaultCmdServerService<M, R, W, LEN, CMD>
281{
282 async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
283 self.serve_tunnel(tunnel).await
284 }
285}
286
287#[async_trait::async_trait]
288impl<
289 M: CmdTunnelMeta,
290 R: CmdTunnelRead<M>,
291 W: CmdTunnelWrite<M>,
292 LEN: RawEncode
293 + for<'a> RawDecode<'a>
294 + Copy
295 + RawFixedBytes
296 + Sync
297 + Send
298 + 'static
299 + FromPrimitive
300 + ToPrimitive,
301 CMD: RawEncode
302 + for<'a> RawDecode<'a>
303 + Copy
304 + RawFixedBytes
305 + Sync
306 + Send
307 + 'static
308 + Eq
309 + Hash
310 + Debug,
311> CmdServer<LEN, CMD> for DefaultCmdServerService<M, R, W, LEN, CMD>
312{
313 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
314 self.cmd_handler_map.insert(cmd, handler);
315 }
316
317 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
318 let connections = self.peer_manager.find_connections(peer_id);
319 for conn in connections {
320 let ret: CmdResult<()> = async move {
321 log::debug!(
322 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
323 peer_id,
324 conn.conn_id,
325 cmd,
326 body.len(),
327 hex::encode(body)
328 );
329 let header = CmdHeader::<LEN, CMD>::new(
330 version,
331 false,
332 None,
333 cmd,
334 LEN::from_u64(body.len() as u64).unwrap(),
335 );
336 let buf = header
337 .to_vec()
338 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
339 let mut send = conn.send.get().await;
340 if buf.len() > 255 {
341 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
342 }
343 send.write_u8(buf.len() as u8)
344 .await
345 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
346 send.write_all(buf.as_slice())
347 .await
348 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
349 send.write_all(body)
350 .await
351 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
352 send.flush()
353 .await
354 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
355 Ok(())
356 }
357 .await;
358 if ret.is_ok() {
359 break;
360 }
361 }
362 Ok(())
363 }
364
365 async fn send_with_resp(
366 &self,
367 peer_id: &PeerId,
368 cmd: CMD,
369 version: u8,
370 body: &[u8],
371 timeout: Duration,
372 ) -> CmdResult<CmdBody> {
373 let connections = self.peer_manager.find_connections(peer_id);
374 for conn in connections {
375 if let Some(id) = tokio::task::try_id() {
376 if self.state_holder.has_state(id) {
377 continue;
378 }
379 }
380 let ret: CmdResult<CmdBody> = async move {
381 log::debug!(
382 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
383 peer_id,
384 conn.conn_id,
385 cmd,
386 body.len(),
387 hex::encode(body)
388 );
389 let seq = gen_seq();
390 let header = CmdHeader::<LEN, CMD>::new(
391 version,
392 false,
393 Some(seq),
394 cmd,
395 LEN::from_u64(body.len() as u64).unwrap(),
396 );
397 let buf = header
398 .to_vec()
399 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
400 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
401 let waiter = self
402 .resp_waiter
403 .create_timeout_result_future(resp_id, timeout)
404 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
405 {
406 let mut send = conn.send.get().await;
407 if buf.len() > 255 {
408 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
409 }
410 send.write_u8(buf.len() as u8)
411 .await
412 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
413 send.write_all(buf.as_slice())
414 .await
415 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
416 send.write_all(body)
417 .await
418 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
419 send.flush()
420 .await
421 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
422 }
423 let body =
424 waiter
425 .await
426 .map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
427 Ok(body)
428 }
429 .await;
430 if ret.is_ok() {
431 return ret;
432 } else {
433 sfo_log::error!("send err {:?}", ret.unwrap_err());
434 }
435 }
436 Err(cmd_err!(
437 CmdErrorCode::Failed,
438 "send to peer_id: {}",
439 peer_id
440 ))
441 }
442
443 async fn send2(
444 &self,
445 peer_id: &PeerId,
446 cmd: CMD,
447 version: u8,
448 body: &[&[u8]],
449 ) -> CmdResult<()> {
450 let connections = self.peer_manager.find_connections(peer_id);
451 for conn in connections {
452 let ret: CmdResult<()> = async move {
453 let mut len = 0;
454 for b in body.iter() {
455 len += b.len();
456 log::debug!(
457 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
458 peer_id,
459 conn.conn_id,
460 cmd,
461 hex::encode(b)
462 );
463 }
464 log::debug!(
465 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
466 peer_id,
467 conn.conn_id,
468 cmd,
469 len
470 );
471 let header = CmdHeader::<LEN, CMD>::new(
472 version,
473 false,
474 None,
475 cmd,
476 LEN::from_u64(len as u64).unwrap(),
477 );
478 let buf = header
479 .to_vec()
480 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
481 let mut send = conn.send.get().await;
482 if buf.len() > 255 {
483 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
484 }
485 send.write_u8(buf.len() as u8)
486 .await
487 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
488 send.write_all(buf.as_slice())
489 .await
490 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
491 for b in body.iter() {
492 send.write_all(b)
493 .await
494 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
495 }
496 send.flush()
497 .await
498 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
499 Ok(())
500 }
501 .await;
502 if ret.is_ok() {
503 break;
504 }
505 }
506 Ok(())
507 }
508
509 async fn send2_with_resp(
510 &self,
511 peer_id: &PeerId,
512 cmd: CMD,
513 version: u8,
514 body: &[&[u8]],
515 timeout: Duration,
516 ) -> CmdResult<CmdBody> {
517 let connections = self.peer_manager.find_connections(peer_id);
518 for conn in connections {
519 if let Some(id) = tokio::task::try_id() {
520 if self.state_holder.has_state(id) {
521 continue;
522 }
523 }
524 let ret: CmdResult<CmdBody> = async move {
525 let mut len = 0;
526 for b in body.iter() {
527 len += b.len();
528 log::debug!(
529 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
530 peer_id,
531 conn.conn_id,
532 cmd,
533 hex::encode(b)
534 );
535 }
536 log::debug!(
537 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
538 peer_id,
539 conn.conn_id,
540 cmd,
541 len
542 );
543 let seq = gen_seq();
544 let header = CmdHeader::<LEN, CMD>::new(
545 version,
546 false,
547 Some(seq),
548 cmd,
549 LEN::from_u64(len as u64).unwrap(),
550 );
551 let buf = header
552 .to_vec()
553 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
554 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
555 let waiter = self
556 .resp_waiter
557 .create_timeout_result_future(resp_id, timeout)
558 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
559 {
560 let mut send = conn.send.get().await;
561 if buf.len() > 255 {
562 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
563 }
564 send.write_u8(buf.len() as u8)
565 .await
566 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
567 send.write_all(buf.as_slice())
568 .await
569 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
570 for b in body.iter() {
571 send.write_all(b)
572 .await
573 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
574 }
575 send.flush()
576 .await
577 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
578 }
579 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
580 Ok(body)
581 }
582 .await;
583 if ret.is_ok() {
584 return ret;
585 }
586 }
587 Err(cmd_err!(
588 CmdErrorCode::Failed,
589 "send to peer_id: {}",
590 peer_id
591 ))
592 }
593
594 async fn send_cmd(
595 &self,
596 peer_id: &PeerId,
597 cmd: CMD,
598 version: u8,
599 body: CmdBody,
600 ) -> CmdResult<()> {
601 let body_data = body.into_bytes().await?;
602 let body = body_data.as_slice();
603 let connections = self.peer_manager.find_connections(peer_id);
604 for conn in connections {
605 let ret: CmdResult<()> = async move {
606 log::debug!(
607 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
608 peer_id,
609 conn.conn_id,
610 cmd,
611 body.len(),
612 hex::encode(body)
613 );
614 let header = CmdHeader::<LEN, CMD>::new(
615 version,
616 false,
617 None,
618 cmd,
619 LEN::from_u64(body.len() as u64).unwrap(),
620 );
621 let buf = header
622 .to_vec()
623 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
624 let mut send = conn.send.get().await;
625 if buf.len() > 255 {
626 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
627 }
628 send.write_u8(buf.len() as u8)
629 .await
630 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
631 send.write_all(buf.as_slice())
632 .await
633 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
634 send.write_all(body)
635 .await
636 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
637 send.flush()
638 .await
639 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
640 Ok(())
641 }
642 .await;
643 if ret.is_ok() {
644 break;
645 }
646 }
647 Ok(())
648 }
649
650 async fn send_cmd_with_resp(
651 &self,
652 peer_id: &PeerId,
653 cmd: CMD,
654 version: u8,
655 body: CmdBody,
656 timeout: Duration,
657 ) -> CmdResult<CmdBody> {
658 let connections = self.peer_manager.find_connections(peer_id);
659 let body_data = body.into_bytes().await?;
660 let data_ref = body_data.as_slice();
661 for conn in connections {
662 if let Some(id) = tokio::task::try_id() {
663 if self.state_holder.has_state(id) {
664 continue;
665 }
666 }
667 let ret: CmdResult<CmdBody> = async move {
668 log::debug!(
669 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}",
670 peer_id,
671 conn.conn_id,
672 cmd,
673 data_ref.len()
674 );
675 let seq = gen_seq();
676 let header = CmdHeader::<LEN, CMD>::new(
677 version,
678 false,
679 Some(seq),
680 cmd,
681 LEN::from_u64(data_ref.len() as u64).unwrap(),
682 );
683 let buf = header
684 .to_vec()
685 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
686 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
687 let waiter = self
688 .resp_waiter
689 .create_timeout_result_future(resp_id, timeout)
690 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
691 {
692 let mut send = conn.send.get().await;
693 if buf.len() > 255 {
694 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
695 }
696 send.write_u8(buf.len() as u8)
697 .await
698 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
699 send.write_all(buf.as_slice())
700 .await
701 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
702 send.write_all(data_ref)
703 .await
704 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
705 send.flush()
706 .await
707 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
708 }
709 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
710 Ok(body)
711 }
712 .await;
713 if ret.is_ok() {
714 return ret;
715 }
716 }
717 Err(cmd_err!(
718 CmdErrorCode::Failed,
719 "send to peer_id: {}",
720 peer_id
721 ))
722 }
723
724 async fn send_by_specify_tunnel(
725 &self,
726 peer_id: &PeerId,
727 tunnel_id: TunnelId,
728 cmd: CMD,
729 version: u8,
730 body: &[u8],
731 ) -> CmdResult<()> {
732 let conn = self.peer_manager.find_connection(tunnel_id);
733 if conn.is_none() {
734 return Err(cmd_err!(
735 CmdErrorCode::PeerConnectionNotFound,
736 "tunnel_id: {:?}",
737 tunnel_id
738 ));
739 }
740 let conn = conn.unwrap();
741 assert_eq!(tunnel_id, conn.conn_id);
742 log::trace!(
743 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
744 peer_id,
745 conn.conn_id,
746 cmd,
747 body.len(),
748 hex::encode(body)
749 );
750 let header = CmdHeader::<LEN, CMD>::new(
751 version,
752 false,
753 None,
754 cmd,
755 LEN::from_u64(body.len() as u64).unwrap(),
756 );
757 let buf = header
758 .to_vec()
759 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
760 let mut send = conn.send.get().await;
761 if buf.len() > 255 {
762 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
763 }
764 send.write_u8(buf.len() as u8)
765 .await
766 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
767 send.write_all(buf.as_slice())
768 .await
769 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
770 send.write_all(body)
771 .await
772 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
773 send.flush()
774 .await
775 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
776 Ok(())
777 }
778
779 async fn send_by_specify_tunnel_with_resp(
780 &self,
781 peer_id: &PeerId,
782 tunnel_id: TunnelId,
783 cmd: CMD,
784 version: u8,
785 body: &[u8],
786 timeout: Duration,
787 ) -> CmdResult<CmdBody> {
788 let conn = self.peer_manager.find_connection(tunnel_id);
789 if conn.is_none() {
790 return Err(cmd_err!(
791 CmdErrorCode::PeerConnectionNotFound,
792 "tunnel_id: {:?}",
793 tunnel_id
794 ));
795 }
796 let conn = conn.unwrap();
797 if let Some(id) = tokio::task::try_id() {
798 if self.state_holder.has_state(id) {
799 return Err(cmd_err!(
800 CmdErrorCode::Failed,
801 "can't send msg with resp in tunnel {:?} msg handle",
802 conn.conn_id
803 ));
804 }
805 }
806 assert_eq!(tunnel_id, conn.conn_id);
807 log::trace!(
808 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
809 peer_id,
810 conn.conn_id,
811 cmd,
812 body.len(),
813 hex::encode(body)
814 );
815 let seq = gen_seq();
816 let header = CmdHeader::<LEN, CMD>::new(
817 version,
818 false,
819 Some(seq),
820 cmd,
821 LEN::from_u64(body.len() as u64).unwrap(),
822 );
823 let buf = header
824 .to_vec()
825 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
826 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
827 let waiter = self
828 .resp_waiter
829 .create_timeout_result_future(resp_id, timeout)
830 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
831 {
832 let mut send = conn.send.get().await;
833 if buf.len() > 255 {
834 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
835 }
836 send.write_u8(buf.len() as u8)
837 .await
838 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
839 send.write_all(buf.as_slice())
840 .await
841 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
842 send.write_all(body)
843 .await
844 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
845 send.flush()
846 .await
847 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
848 }
849 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
850 Ok(body)
851 }
852
853 async fn send2_by_specify_tunnel(
854 &self,
855 peer_id: &PeerId,
856 tunnel_id: TunnelId,
857 cmd: CMD,
858 version: u8,
859 body: &[&[u8]],
860 ) -> CmdResult<()> {
861 let conn = self.peer_manager.find_connection(tunnel_id);
862 if conn.is_none() {
863 return Err(cmd_err!(
864 CmdErrorCode::PeerConnectionNotFound,
865 "tunnel_id: {:?}",
866 tunnel_id
867 ));
868 }
869 let conn = conn.unwrap();
870 assert_eq!(tunnel_id, conn.conn_id);
871 let mut len = 0;
872 for b in body.iter() {
873 len += b.len();
874 log::debug!(
875 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
876 peer_id,
877 conn.conn_id,
878 cmd,
879 hex::encode(b)
880 );
881 }
882 log::debug!(
883 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
884 peer_id,
885 conn.conn_id,
886 cmd,
887 len
888 );
889 let header = CmdHeader::<LEN, CMD>::new(
890 version,
891 false,
892 None,
893 cmd,
894 LEN::from_u64(len as u64).unwrap(),
895 );
896 let buf = header
897 .to_vec()
898 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
899 if buf.len() > 255 {
900 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
901 }
902 let mut send = conn.send.get().await;
903 send.write_u8(buf.len() as u8)
904 .await
905 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
906 send.write_all(buf.as_slice())
907 .await
908 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
909 for b in body.iter() {
910 send.write_all(b)
911 .await
912 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
913 }
914 send.flush()
915 .await
916 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
917 Ok(())
918 }
919
920 async fn send2_by_specify_tunnel_with_resp(
921 &self,
922 peer_id: &PeerId,
923 tunnel_id: TunnelId,
924 cmd: CMD,
925 version: u8,
926 body: &[&[u8]],
927 timeout: Duration,
928 ) -> CmdResult<CmdBody> {
929 let conn = self.peer_manager.find_connection(tunnel_id);
930 if conn.is_none() {
931 return Err(cmd_err!(
932 CmdErrorCode::PeerConnectionNotFound,
933 "tunnel_id: {:?}",
934 tunnel_id
935 ));
936 }
937 let conn = conn.unwrap();
938 if let Some(id) = tokio::task::try_id() {
939 if self.state_holder.has_state(id) {
940 return Err(cmd_err!(
941 CmdErrorCode::Failed,
942 "can't send msg with resp in tunnel {:?} msg handle",
943 conn.conn_id
944 ));
945 }
946 }
947 assert_eq!(tunnel_id, conn.conn_id);
948 let mut len = 0;
949 for b in body.iter() {
950 len += b.len();
951 log::debug!(
952 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
953 peer_id,
954 conn.conn_id,
955 cmd,
956 hex::encode(b)
957 );
958 }
959 log::debug!(
960 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
961 peer_id,
962 conn.conn_id,
963 cmd,
964 len
965 );
966 let seq = gen_seq();
967 let header = CmdHeader::<LEN, CMD>::new(
968 version,
969 false,
970 Some(seq),
971 cmd,
972 LEN::from_u64(len as u64).unwrap(),
973 );
974 let buf = header
975 .to_vec()
976 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
977 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
978 let waiter = self
979 .resp_waiter
980 .create_timeout_result_future(resp_id, timeout)
981 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
982 if buf.len() > 255 {
983 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
984 }
985 {
986 let mut send = conn.send.get().await;
987 send.write_u8(buf.len() as u8)
988 .await
989 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
990 send.write_all(buf.as_slice())
991 .await
992 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
993 for b in body.iter() {
994 send.write_all(b)
995 .await
996 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
997 }
998 send.flush()
999 .await
1000 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1001 }
1002 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1003 Ok(body)
1004 }
1005
1006 async fn send_cmd_by_specify_tunnel(
1007 &self,
1008 peer_id: &PeerId,
1009 tunnel_id: TunnelId,
1010 cmd: CMD,
1011 version: u8,
1012 mut body: CmdBody,
1013 ) -> CmdResult<()> {
1014 let conn = self.peer_manager.find_connection(tunnel_id);
1015 if conn.is_none() {
1016 return Err(cmd_err!(
1017 CmdErrorCode::PeerConnectionNotFound,
1018 "tunnel_id: {:?}",
1019 tunnel_id
1020 ));
1021 }
1022 let conn = conn.unwrap();
1023 assert_eq!(tunnel_id, conn.conn_id);
1024 log::debug!(
1025 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1026 peer_id,
1027 conn.conn_id,
1028 cmd,
1029 body.len()
1030 );
1031 let header = CmdHeader::<LEN, CMD>::new(
1032 version,
1033 false,
1034 None,
1035 cmd,
1036 LEN::from_u64(body.len()).unwrap(),
1037 );
1038 let buf = header
1039 .to_vec()
1040 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1041 let mut send = conn.send.get().await;
1042 if buf.len() > 255 {
1043 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1044 }
1045 send.write_u8(buf.len() as u8)
1046 .await
1047 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1048 send.write_all(buf.as_slice())
1049 .await
1050 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1051 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1052 .await
1053 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1054 send.flush()
1055 .await
1056 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1057 Ok(())
1058 }
1059
1060 async fn send_cmd_by_specify_tunnel_with_resp(
1061 &self,
1062 peer_id: &PeerId,
1063 tunnel_id: TunnelId,
1064 cmd: CMD,
1065 version: u8,
1066 mut body: CmdBody,
1067 timeout: Duration,
1068 ) -> CmdResult<CmdBody> {
1069 let conn = self.peer_manager.find_connection(tunnel_id);
1070 if conn.is_none() {
1071 return Err(cmd_err!(
1072 CmdErrorCode::PeerConnectionNotFound,
1073 "tunnel_id: {:?}",
1074 tunnel_id
1075 ));
1076 }
1077 let conn = conn.unwrap();
1078 if let Some(id) = tokio::task::try_id() {
1079 if self.state_holder.has_state(id) {
1080 return Err(cmd_err!(
1081 CmdErrorCode::Failed,
1082 "can't send msg with resp in tunnel {:?} msg handle",
1083 conn.conn_id
1084 ));
1085 }
1086 }
1087 assert_eq!(tunnel_id, conn.conn_id);
1088 log::debug!(
1089 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1090 peer_id,
1091 conn.conn_id,
1092 cmd,
1093 body.len()
1094 );
1095 let seq = gen_seq();
1096 let header = CmdHeader::<LEN, CMD>::new(
1097 version,
1098 false,
1099 Some(seq),
1100 cmd,
1101 LEN::from_u64(body.len()).unwrap(),
1102 );
1103 let buf = header
1104 .to_vec()
1105 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1106 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1107 let waiter = self
1108 .resp_waiter
1109 .create_timeout_result_future(resp_id, timeout)
1110 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1111 {
1112 let mut send = conn.send.get().await;
1113 if buf.len() > 255 {
1114 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1115 }
1116 send.write_u8(buf.len() as u8)
1117 .await
1118 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1119 send.write_all(buf.as_slice())
1120 .await
1121 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1122 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1123 .await
1124 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1125 send.flush()
1126 .await
1127 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1128 }
1129 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1130 Ok(body)
1131 }
1132
1133 async fn send_by_all_tunnels(
1134 &self,
1135 peer_id: &PeerId,
1136 cmd: CMD,
1137 version: u8,
1138 body: &[u8],
1139 ) -> CmdResult<()> {
1140 let connections = self.peer_manager.find_connections(peer_id);
1141 for conn in connections {
1142 let _ret: CmdResult<()> = async move {
1143 let header = CmdHeader::<LEN, CMD>::new(
1144 version,
1145 false,
1146 None,
1147 cmd,
1148 LEN::from_u64(body.len() as u64).unwrap(),
1149 );
1150 let buf = header
1151 .to_vec()
1152 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1153 let mut send = conn.send.get().await;
1154 send.write_u8(buf.len() as u8)
1155 .await
1156 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1157 send.write_all(buf.as_slice())
1158 .await
1159 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1160 send.write_all(body)
1161 .await
1162 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1163 send.flush()
1164 .await
1165 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1166 Ok(())
1167 }
1168 .await;
1169 }
1170 Ok(())
1171 }
1172
1173 async fn send2_by_all_tunnels(
1174 &self,
1175 peer_id: &PeerId,
1176 cmd: CMD,
1177 version: u8,
1178 body: &[&[u8]],
1179 ) -> CmdResult<()> {
1180 let connections = self.peer_manager.find_connections(peer_id);
1181 let mut len = 0;
1182 for b in body.iter() {
1183 len += b.len();
1184 }
1185 for conn in connections {
1186 let _ret: CmdResult<()> = async move {
1187 let header = CmdHeader::<LEN, CMD>::new(
1188 version,
1189 false,
1190 None,
1191 cmd,
1192 LEN::from_u64(len as u64).unwrap(),
1193 );
1194 let buf = header
1195 .to_vec()
1196 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1197 if buf.len() > 255 {
1198 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1199 }
1200 let mut send = conn.send.get().await;
1201 send.write_u8(buf.len() as u8)
1202 .await
1203 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1204 send.write_all(buf.as_slice())
1205 .await
1206 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1207 for b in body.iter() {
1208 send.write_all(b)
1209 .await
1210 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1211 }
1212 send.flush()
1213 .await
1214 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1215 Ok(())
1216 }
1217 .await;
1218 }
1219 Ok(())
1220 }
1221}
1222
1223pub struct DefaultCmdServerIncoming<
1224 M: CmdTunnelMeta,
1225 R: CmdTunnelRead<M>,
1226 W: CmdTunnelWrite<M>,
1227 LISTENER,
1228> {
1229 tunnel_listener: LISTENER,
1230 tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1231 _p: PhantomData<fn() -> (M, R, W)>,
1232}
1233
1234impl<
1235 M: CmdTunnelMeta,
1236 R: CmdTunnelRead<M>,
1237 W: CmdTunnelWrite<M>,
1238 LISTENER: CmdTunnelListener<M, R, W>,
1239> DefaultCmdServerIncoming<M, R, W, LISTENER>
1240{
1241 pub fn new(
1242 tunnel_listener: LISTENER,
1243 tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1244 ) -> Arc<Self> {
1245 Arc::new(Self {
1246 tunnel_listener,
1247 tunnel_service,
1248 _p: PhantomData,
1249 })
1250 }
1251
1252 pub fn start(self: &Arc<Self>) {
1253 let this = self.clone();
1254 tokio::spawn(async move {
1255 if let Err(e) = this.run().await {
1256 log::error!("cmd server error: {:?}", e);
1257 }
1258 });
1259 }
1260
1261 pub async fn run(&self) -> CmdResult<()> {
1262 loop {
1263 let tunnel = self.tunnel_listener.accept().await?;
1264 let tunnel_service = self.tunnel_service.clone();
1265 tokio::spawn(async move {
1266 if let Err(e) = tunnel_service.handle_tunnel(tunnel).await {
1267 log::error!("peer connection error: {:?}", e);
1268 }
1269 });
1270 }
1271 }
1272}
1273
1274pub struct DefaultCmdServer<
1275 M: CmdTunnelMeta,
1276 R: CmdTunnelRead<M>,
1277 W: CmdTunnelWrite<M>,
1278 LEN,
1279 CMD,
1280 LISTENER,
1281> {
1282 incoming: Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>>,
1283 service: Arc<DefaultCmdServerService<M, R, W, LEN, CMD>>,
1284}
1285
1286impl<
1287 M: CmdTunnelMeta,
1288 R: CmdTunnelRead<M>,
1289 W: CmdTunnelWrite<M>,
1290 LEN: RawEncode
1291 + for<'a> RawDecode<'a>
1292 + Copy
1293 + RawFixedBytes
1294 + Sync
1295 + Send
1296 + 'static
1297 + FromPrimitive
1298 + ToPrimitive,
1299 CMD: RawEncode
1300 + for<'a> RawDecode<'a>
1301 + Copy
1302 + RawFixedBytes
1303 + Sync
1304 + Send
1305 + 'static
1306 + Eq
1307 + Hash
1308 + Debug,
1309 LISTENER: CmdTunnelListener<M, R, W>,
1310> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1311{
1312 pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
1313 let service = DefaultCmdServerService::new();
1314 let incoming = DefaultCmdServerIncoming::new(tunnel_listener, service.clone());
1315 Arc::new(Self { incoming, service })
1316 }
1317
1318 pub fn incoming(&self) -> Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>> {
1319 self.incoming.clone()
1320 }
1321
1322 pub fn service(&self) -> Arc<DefaultCmdServerService<M, R, W, LEN, CMD>> {
1323 self.service.clone()
1324 }
1325
1326 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
1327 self.service.attach_event_listener(event_listener);
1328 }
1329
1330 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
1331 self.service.get_peer_tunnels(peer_id).await
1332 }
1333
1334 pub fn start(self: &Arc<Self>) {
1335 self.incoming.start();
1336 }
1337}
1338
1339impl<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> Deref
1340 for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1341{
1342 type Target = DefaultCmdServerService<M, R, W, LEN, CMD>;
1343
1344 fn deref(&self) -> &Self::Target {
1345 self.service.as_ref()
1346 }
1347}
1348
1349#[async_trait::async_trait]
1350impl<
1351 M: CmdTunnelMeta,
1352 R: CmdTunnelRead<M>,
1353 W: CmdTunnelWrite<M>,
1354 LEN: RawEncode
1355 + for<'a> RawDecode<'a>
1356 + Copy
1357 + RawFixedBytes
1358 + Sync
1359 + Send
1360 + 'static
1361 + FromPrimitive
1362 + ToPrimitive,
1363 CMD: RawEncode
1364 + for<'a> RawDecode<'a>
1365 + Copy
1366 + RawFixedBytes
1367 + Sync
1368 + Send
1369 + 'static
1370 + Eq
1371 + Hash
1372 + Debug,
1373 LISTENER: CmdTunnelListener<M, R, W>,
1374> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1375{
1376 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
1377 self.service.register_cmd_handler(cmd, handler);
1378 }
1379
1380 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
1381 self.service.send(peer_id, cmd, version, body).await
1382 }
1383
1384 async fn send_with_resp(
1385 &self,
1386 peer_id: &PeerId,
1387 cmd: CMD,
1388 version: u8,
1389 body: &[u8],
1390 timeout: Duration,
1391 ) -> CmdResult<CmdBody> {
1392 self.service
1393 .send_with_resp(peer_id, cmd, version, body, timeout)
1394 .await
1395 }
1396
1397 async fn send2(
1398 &self,
1399 peer_id: &PeerId,
1400 cmd: CMD,
1401 version: u8,
1402 body: &[&[u8]],
1403 ) -> CmdResult<()> {
1404 self.service.send2(peer_id, cmd, version, body).await
1405 }
1406
1407 async fn send2_with_resp(
1408 &self,
1409 peer_id: &PeerId,
1410 cmd: CMD,
1411 version: u8,
1412 body: &[&[u8]],
1413 timeout: Duration,
1414 ) -> CmdResult<CmdBody> {
1415 self.service
1416 .send2_with_resp(peer_id, cmd, version, body, timeout)
1417 .await
1418 }
1419
1420 async fn send_cmd(
1421 &self,
1422 peer_id: &PeerId,
1423 cmd: CMD,
1424 version: u8,
1425 body: CmdBody,
1426 ) -> CmdResult<()> {
1427 self.service.send_cmd(peer_id, cmd, version, body).await
1428 }
1429
1430 async fn send_cmd_with_resp(
1431 &self,
1432 peer_id: &PeerId,
1433 cmd: CMD,
1434 version: u8,
1435 body: CmdBody,
1436 timeout: Duration,
1437 ) -> CmdResult<CmdBody> {
1438 self.service
1439 .send_cmd_with_resp(peer_id, cmd, version, body, timeout)
1440 .await
1441 }
1442
1443 async fn send_by_specify_tunnel(
1444 &self,
1445 peer_id: &PeerId,
1446 tunnel_id: TunnelId,
1447 cmd: CMD,
1448 version: u8,
1449 body: &[u8],
1450 ) -> CmdResult<()> {
1451 self.service
1452 .send_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1453 .await
1454 }
1455
1456 async fn send_by_specify_tunnel_with_resp(
1457 &self,
1458 peer_id: &PeerId,
1459 tunnel_id: TunnelId,
1460 cmd: CMD,
1461 version: u8,
1462 body: &[u8],
1463 timeout: Duration,
1464 ) -> CmdResult<CmdBody> {
1465 self.service
1466 .send_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1467 .await
1468 }
1469
1470 async fn send2_by_specify_tunnel(
1471 &self,
1472 peer_id: &PeerId,
1473 tunnel_id: TunnelId,
1474 cmd: CMD,
1475 version: u8,
1476 body: &[&[u8]],
1477 ) -> CmdResult<()> {
1478 self.service
1479 .send2_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1480 .await
1481 }
1482
1483 async fn send2_by_specify_tunnel_with_resp(
1484 &self,
1485 peer_id: &PeerId,
1486 tunnel_id: TunnelId,
1487 cmd: CMD,
1488 version: u8,
1489 body: &[&[u8]],
1490 timeout: Duration,
1491 ) -> CmdResult<CmdBody> {
1492 self.service
1493 .send2_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1494 .await
1495 }
1496
1497 async fn send_cmd_by_specify_tunnel(
1498 &self,
1499 peer_id: &PeerId,
1500 tunnel_id: TunnelId,
1501 cmd: CMD,
1502 version: u8,
1503 body: CmdBody,
1504 ) -> CmdResult<()> {
1505 self.service
1506 .send_cmd_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1507 .await
1508 }
1509
1510 async fn send_cmd_by_specify_tunnel_with_resp(
1511 &self,
1512 peer_id: &PeerId,
1513 tunnel_id: TunnelId,
1514 cmd: CMD,
1515 version: u8,
1516 body: CmdBody,
1517 timeout: Duration,
1518 ) -> CmdResult<CmdBody> {
1519 self.service
1520 .send_cmd_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1521 .await
1522 }
1523
1524 async fn send_by_all_tunnels(
1525 &self,
1526 peer_id: &PeerId,
1527 cmd: CMD,
1528 version: u8,
1529 body: &[u8],
1530 ) -> CmdResult<()> {
1531 self.service
1532 .send_by_all_tunnels(peer_id, cmd, version, body)
1533 .await
1534 }
1535
1536 async fn send2_by_all_tunnels(
1537 &self,
1538 peer_id: &PeerId,
1539 cmd: CMD,
1540 version: u8,
1541 body: &[&[u8]],
1542 ) -> CmdResult<()> {
1543 self.service
1544 .send2_by_all_tunnels(peer_id, cmd, version, body)
1545 .await
1546 }
1547}