Skip to main content

sfo_cmd_server/server/
server.rs

1use super::CmdServer;
2use super::peer_manager::{PeerManager, PeerManagerRef};
3use crate::client::{RespWaiter, RespWaiterRef, gen_resp_id, gen_seq};
4use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
5use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
6use crate::peer_connection::PeerConnection;
7use crate::peer_id::PeerId;
8use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
9use async_named_locker::{NamedStateHolder, ObjectHolder};
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_split::Splittable;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::marker::PhantomData;
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
20
21#[async_trait::async_trait]
22pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
23    Send + Sync + 'static
24{
25    async fn accept(&self) -> CmdResult<Splittable<R, W>>;
26}
27
28#[async_trait::async_trait]
29pub trait CmdTunnelService<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
30    Send + Sync + 'static
31{
32    async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()>;
33}
34
35#[async_trait::async_trait]
36pub trait CmdServerEventListener: Send + Sync + 'static {
37    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
38    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
39}
40
41#[derive(Clone)]
42struct CmdServerEventListenerEmit {
43    listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
44}
45
46impl CmdServerEventListenerEmit {
47    pub fn new() -> Self {
48        Self {
49            listeners: Arc::new(Mutex::new(Vec::new())),
50        }
51    }
52
53    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
54        self.listeners.lock().unwrap().push(event_listener);
55    }
56}
57
58#[async_trait::async_trait]
59impl CmdServerEventListener for CmdServerEventListenerEmit {
60    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
61        let listeners = { self.listeners.lock().unwrap().clone() };
62        for listener in listeners.iter() {
63            if let Err(e) = listener.on_peer_connected(peer_id).await {
64                log::error!("on_peer_connected error: {:?}", e);
65            }
66        }
67        Ok(())
68    }
69
70    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
71        let listeners = { self.listeners.lock().unwrap().clone() };
72        for listener in listeners.iter() {
73            if let Err(e) = listener.on_peer_disconnected(peer_id).await {
74                log::error!("on_peer_disconnected error: {:?}", e);
75            }
76        }
77        Ok(())
78    }
79}
80
81pub struct DefaultCmdServerService<
82    M: CmdTunnelMeta,
83    R: CmdTunnelRead<M>,
84    W: CmdTunnelWrite<M>,
85    LEN,
86    CMD,
87> {
88    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
89    peer_manager: PeerManagerRef<M, R, W>,
90    event_emit: CmdServerEventListenerEmit,
91    resp_waiter: RespWaiterRef,
92    state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
93    _p: PhantomData<fn() -> (M, R, W, LEN, CMD)>,
94}
95
96impl<
97    M: CmdTunnelMeta,
98    R: CmdTunnelRead<M>,
99    W: CmdTunnelWrite<M>,
100    LEN: RawEncode
101        + for<'a> RawDecode<'a>
102        + Copy
103        + RawFixedBytes
104        + Sync
105        + Send
106        + 'static
107        + FromPrimitive
108        + ToPrimitive,
109    CMD: RawEncode
110        + for<'a> RawDecode<'a>
111        + Copy
112        + RawFixedBytes
113        + Sync
114        + Send
115        + 'static
116        + Eq
117        + Hash
118        + Debug,
119> DefaultCmdServerService<M, R, W, LEN, CMD>
120{
121    pub fn new() -> Arc<Self> {
122        let event_emit = CmdServerEventListenerEmit::new();
123        Arc::new(Self {
124            cmd_handler_map: Arc::new(CmdHandlerMap::new()),
125            peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
126            event_emit,
127            resp_waiter: Arc::new(RespWaiter::new()),
128            state_holder: NamedStateHolder::new(),
129            _p: PhantomData,
130        })
131    }
132
133    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
134        self.event_emit.attach_event_listener(event_listener);
135    }
136
137    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
138        self.peer_manager.find_connections(peer_id)
139    }
140
141    pub async fn serve_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
142        let peer_id = tunnel.get_remote_peer_id();
143        let tunnel_id = self.peer_manager.generate_conn_id();
144        let cmd_handler_map = self.cmd_handler_map.clone();
145        let resp_waiter = self.resp_waiter.clone();
146        let state_holder = self.state_holder.clone();
147        let (mut reader, writer) = tunnel.split();
148        let writer = ObjectHolder::new(writer);
149        let resp_write = writer.clone();
150        let remote_id = peer_id.clone();
151        let recv_handle = tokio::spawn(async move {
152            let ret: CmdResult<()> = async move {
153                loop {
154                    let header_len = reader
155                        .read_u8()
156                        .await
157                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
158                    let mut header = vec![0u8; header_len as usize];
159                    let n = reader
160                        .read_exact(&mut header)
161                        .await
162                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
163                    if n == 0 {
164                        break;
165                    }
166                    let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
167                        .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
168                    sfo_log::debug!("recv cmd {:?}", header.cmd_code());
169                    let body_len = header.pkg_len().to_u64().unwrap();
170                    let cmd_read = CmdBodyRead::new(reader, body_len as usize);
171                    let waiter = cmd_read.get_waiter();
172                    let future = waiter
173                        .create_result_future()
174                        .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
175                    {
176                        let body = CmdBody::from_reader(BufReader::new(cmd_read), body_len);
177                        if header.is_resp() && header.seq().is_some() {
178                            let resp_id =
179                                gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
180                            let _ = resp_waiter.set_result(resp_id, body);
181                        } else if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
182                            let version = header.version();
183                            let seq = header.seq();
184                            let cmd_code = header.cmd_code();
185                            match {
186                                let _handle_state = state_holder.new_state(tokio::task::id());
187                                handler
188                                    .handle(remote_id.clone(), tunnel_id, header, body)
189                                    .await
190                            } {
191                                Ok(Some(mut body)) => {
192                                    let mut write = resp_write.get().await;
193                                    let header = CmdHeader::<LEN, CMD>::new(
194                                        version,
195                                        true,
196                                        seq,
197                                        cmd_code,
198                                        LEN::from_u64(body.len()).unwrap(),
199                                    );
200                                    let buf = header
201                                        .to_vec()
202                                        .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
203                                    if buf.len() > 255 {
204                                        return Err(cmd_err!(
205                                            CmdErrorCode::RawCodecError,
206                                            "header len too large"
207                                        ));
208                                    }
209                                    write
210                                        .write_u8(buf.len() as u8)
211                                        .await
212                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
213                                    write
214                                        .write_all(buf.as_slice())
215                                        .await
216                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
217                                    tokio::io::copy(&mut body, write.deref_mut().deref_mut())
218                                        .await
219                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
220                                    write
221                                        .flush()
222                                        .await
223                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
224                                }
225                                Err(e) => {
226                                    log::error!("handle cmd error: {:?}", e);
227                                }
228                                Ok(None) => {}
229                            }
230                        }
231                    }
232                    reader = future
233                        .await
234                        .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
235                }
236                Ok(())
237            }
238            .await;
239            if let Err(e) = ret.as_ref() {
240                log::error!("recv cmd error: {:?}", e);
241            }
242            ret
243        });
244
245        let peer_conn = PeerConnection {
246            conn_id: tunnel_id,
247            peer_id,
248            send: writer,
249            handle: Some(recv_handle),
250        };
251        self.peer_manager.add_peer_connection(peer_conn).await;
252        Ok(())
253    }
254}
255
256#[async_trait::async_trait]
257impl<
258    M: CmdTunnelMeta,
259    R: CmdTunnelRead<M>,
260    W: CmdTunnelWrite<M>,
261    LEN: RawEncode
262        + for<'a> RawDecode<'a>
263        + Copy
264        + RawFixedBytes
265        + Sync
266        + Send
267        + 'static
268        + FromPrimitive
269        + ToPrimitive,
270    CMD: RawEncode
271        + for<'a> RawDecode<'a>
272        + Copy
273        + RawFixedBytes
274        + Sync
275        + Send
276        + 'static
277        + Eq
278        + Hash
279        + Debug,
280> CmdTunnelService<M, R, W> for DefaultCmdServerService<M, R, W, LEN, CMD>
281{
282    async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
283        self.serve_tunnel(tunnel).await
284    }
285}
286
287#[async_trait::async_trait]
288impl<
289    M: CmdTunnelMeta,
290    R: CmdTunnelRead<M>,
291    W: CmdTunnelWrite<M>,
292    LEN: RawEncode
293        + for<'a> RawDecode<'a>
294        + Copy
295        + RawFixedBytes
296        + Sync
297        + Send
298        + 'static
299        + FromPrimitive
300        + ToPrimitive,
301    CMD: RawEncode
302        + for<'a> RawDecode<'a>
303        + Copy
304        + RawFixedBytes
305        + Sync
306        + Send
307        + 'static
308        + Eq
309        + Hash
310        + Debug,
311> CmdServer<LEN, CMD> for DefaultCmdServerService<M, R, W, LEN, CMD>
312{
313    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
314        self.cmd_handler_map.insert(cmd, handler);
315    }
316
317    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
318        let connections = self.peer_manager.find_connections(peer_id);
319        for conn in connections {
320            let ret: CmdResult<()> = async move {
321                log::debug!(
322                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
323                    peer_id,
324                    conn.conn_id,
325                    cmd,
326                    body.len(),
327                    hex::encode(body)
328                );
329                let header = CmdHeader::<LEN, CMD>::new(
330                    version,
331                    false,
332                    None,
333                    cmd,
334                    LEN::from_u64(body.len() as u64).unwrap(),
335                );
336                let buf = header
337                    .to_vec()
338                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
339                let mut send = conn.send.get().await;
340                if buf.len() > 255 {
341                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
342                }
343                send.write_u8(buf.len() as u8)
344                    .await
345                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
346                send.write_all(buf.as_slice())
347                    .await
348                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
349                send.write_all(body)
350                    .await
351                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
352                send.flush()
353                    .await
354                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
355                Ok(())
356            }
357            .await;
358            if ret.is_ok() {
359                break;
360            }
361        }
362        Ok(())
363    }
364
365    async fn send_with_resp(
366        &self,
367        peer_id: &PeerId,
368        cmd: CMD,
369        version: u8,
370        body: &[u8],
371        timeout: Duration,
372    ) -> CmdResult<CmdBody> {
373        let connections = self.peer_manager.find_connections(peer_id);
374        for conn in connections {
375            if let Some(id) = tokio::task::try_id() {
376                if self.state_holder.has_state(id) {
377                    continue;
378                }
379            }
380            let ret: CmdResult<CmdBody> = async move {
381                log::debug!(
382                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
383                    peer_id,
384                    conn.conn_id,
385                    cmd,
386                    body.len(),
387                    hex::encode(body)
388                );
389                let seq = gen_seq();
390                let header = CmdHeader::<LEN, CMD>::new(
391                    version,
392                    false,
393                    Some(seq),
394                    cmd,
395                    LEN::from_u64(body.len() as u64).unwrap(),
396                );
397                let buf = header
398                    .to_vec()
399                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
400                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
401                let waiter = self
402                    .resp_waiter
403                    .create_timeout_result_future(resp_id, timeout)
404                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
405                {
406                    let mut send = conn.send.get().await;
407                    if buf.len() > 255 {
408                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
409                    }
410                    send.write_u8(buf.len() as u8)
411                        .await
412                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
413                    send.write_all(buf.as_slice())
414                        .await
415                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
416                    send.write_all(body)
417                        .await
418                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
419                    send.flush()
420                        .await
421                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
422                }
423                let body =
424                    waiter
425                        .await
426                        .map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
427                Ok(body)
428            }
429            .await;
430            if ret.is_ok() {
431                return ret;
432            } else {
433                sfo_log::error!("send err {:?}", ret.unwrap_err());
434            }
435        }
436        Err(cmd_err!(
437            CmdErrorCode::Failed,
438            "send to peer_id: {}",
439            peer_id
440        ))
441    }
442
443    async fn send2(
444        &self,
445        peer_id: &PeerId,
446        cmd: CMD,
447        version: u8,
448        body: &[&[u8]],
449    ) -> CmdResult<()> {
450        let connections = self.peer_manager.find_connections(peer_id);
451        for conn in connections {
452            let ret: CmdResult<()> = async move {
453                let mut len = 0;
454                for b in body.iter() {
455                    len += b.len();
456                    log::debug!(
457                        "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
458                        peer_id,
459                        conn.conn_id,
460                        cmd,
461                        hex::encode(b)
462                    );
463                }
464                log::debug!(
465                    "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
466                    peer_id,
467                    conn.conn_id,
468                    cmd,
469                    len
470                );
471                let header = CmdHeader::<LEN, CMD>::new(
472                    version,
473                    false,
474                    None,
475                    cmd,
476                    LEN::from_u64(len as u64).unwrap(),
477                );
478                let buf = header
479                    .to_vec()
480                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
481                let mut send = conn.send.get().await;
482                if buf.len() > 255 {
483                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
484                }
485                send.write_u8(buf.len() as u8)
486                    .await
487                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
488                send.write_all(buf.as_slice())
489                    .await
490                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
491                for b in body.iter() {
492                    send.write_all(b)
493                        .await
494                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
495                }
496                send.flush()
497                    .await
498                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
499                Ok(())
500            }
501            .await;
502            if ret.is_ok() {
503                break;
504            }
505        }
506        Ok(())
507    }
508
509    async fn send2_with_resp(
510        &self,
511        peer_id: &PeerId,
512        cmd: CMD,
513        version: u8,
514        body: &[&[u8]],
515        timeout: Duration,
516    ) -> CmdResult<CmdBody> {
517        let connections = self.peer_manager.find_connections(peer_id);
518        for conn in connections {
519            if let Some(id) = tokio::task::try_id() {
520                if self.state_holder.has_state(id) {
521                    continue;
522                }
523            }
524            let ret: CmdResult<CmdBody> = async move {
525                let mut len = 0;
526                for b in body.iter() {
527                    len += b.len();
528                    log::debug!(
529                        "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
530                        peer_id,
531                        conn.conn_id,
532                        cmd,
533                        hex::encode(b)
534                    );
535                }
536                log::debug!(
537                    "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
538                    peer_id,
539                    conn.conn_id,
540                    cmd,
541                    len
542                );
543                let seq = gen_seq();
544                let header = CmdHeader::<LEN, CMD>::new(
545                    version,
546                    false,
547                    Some(seq),
548                    cmd,
549                    LEN::from_u64(len as u64).unwrap(),
550                );
551                let buf = header
552                    .to_vec()
553                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
554                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
555                let waiter = self
556                    .resp_waiter
557                    .create_timeout_result_future(resp_id, timeout)
558                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
559                {
560                    let mut send = conn.send.get().await;
561                    if buf.len() > 255 {
562                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
563                    }
564                    send.write_u8(buf.len() as u8)
565                        .await
566                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
567                    send.write_all(buf.as_slice())
568                        .await
569                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
570                    for b in body.iter() {
571                        send.write_all(b)
572                            .await
573                            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
574                    }
575                    send.flush()
576                        .await
577                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
578                }
579                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
580                Ok(body)
581            }
582            .await;
583            if ret.is_ok() {
584                return ret;
585            }
586        }
587        Err(cmd_err!(
588            CmdErrorCode::Failed,
589            "send to peer_id: {}",
590            peer_id
591        ))
592    }
593
594    async fn send_cmd(
595        &self,
596        peer_id: &PeerId,
597        cmd: CMD,
598        version: u8,
599        body: CmdBody,
600    ) -> CmdResult<()> {
601        let body_data = body.into_bytes().await?;
602        let body = body_data.as_slice();
603        let connections = self.peer_manager.find_connections(peer_id);
604        for conn in connections {
605            let ret: CmdResult<()> = async move {
606                log::debug!(
607                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
608                    peer_id,
609                    conn.conn_id,
610                    cmd,
611                    body.len(),
612                    hex::encode(body)
613                );
614                let header = CmdHeader::<LEN, CMD>::new(
615                    version,
616                    false,
617                    None,
618                    cmd,
619                    LEN::from_u64(body.len() as u64).unwrap(),
620                );
621                let buf = header
622                    .to_vec()
623                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
624                let mut send = conn.send.get().await;
625                if buf.len() > 255 {
626                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
627                }
628                send.write_u8(buf.len() as u8)
629                    .await
630                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
631                send.write_all(buf.as_slice())
632                    .await
633                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
634                send.write_all(body)
635                    .await
636                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
637                send.flush()
638                    .await
639                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
640                Ok(())
641            }
642            .await;
643            if ret.is_ok() {
644                break;
645            }
646        }
647        Ok(())
648    }
649
650    async fn send_cmd_with_resp(
651        &self,
652        peer_id: &PeerId,
653        cmd: CMD,
654        version: u8,
655        body: CmdBody,
656        timeout: Duration,
657    ) -> CmdResult<CmdBody> {
658        let connections = self.peer_manager.find_connections(peer_id);
659        let body_data = body.into_bytes().await?;
660        let data_ref = body_data.as_slice();
661        for conn in connections {
662            if let Some(id) = tokio::task::try_id() {
663                if self.state_holder.has_state(id) {
664                    continue;
665                }
666            }
667            let ret: CmdResult<CmdBody> = async move {
668                log::debug!(
669                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}",
670                    peer_id,
671                    conn.conn_id,
672                    cmd,
673                    data_ref.len()
674                );
675                let seq = gen_seq();
676                let header = CmdHeader::<LEN, CMD>::new(
677                    version,
678                    false,
679                    Some(seq),
680                    cmd,
681                    LEN::from_u64(data_ref.len() as u64).unwrap(),
682                );
683                let buf = header
684                    .to_vec()
685                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
686                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
687                let waiter = self
688                    .resp_waiter
689                    .create_timeout_result_future(resp_id, timeout)
690                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
691                {
692                    let mut send = conn.send.get().await;
693                    if buf.len() > 255 {
694                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
695                    }
696                    send.write_u8(buf.len() as u8)
697                        .await
698                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
699                    send.write_all(buf.as_slice())
700                        .await
701                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
702                    send.write_all(data_ref)
703                        .await
704                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
705                    send.flush()
706                        .await
707                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
708                }
709                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
710                Ok(body)
711            }
712            .await;
713            if ret.is_ok() {
714                return ret;
715            }
716        }
717        Err(cmd_err!(
718            CmdErrorCode::Failed,
719            "send to peer_id: {}",
720            peer_id
721        ))
722    }
723
724    async fn send_by_specify_tunnel(
725        &self,
726        peer_id: &PeerId,
727        tunnel_id: TunnelId,
728        cmd: CMD,
729        version: u8,
730        body: &[u8],
731    ) -> CmdResult<()> {
732        let conn = self.peer_manager.find_connection(tunnel_id);
733        if conn.is_none() {
734            return Err(cmd_err!(
735                CmdErrorCode::PeerConnectionNotFound,
736                "tunnel_id: {:?}",
737                tunnel_id
738            ));
739        }
740        let conn = conn.unwrap();
741        assert_eq!(tunnel_id, conn.conn_id);
742        log::trace!(
743            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
744            peer_id,
745            conn.conn_id,
746            cmd,
747            body.len(),
748            hex::encode(body)
749        );
750        let header = CmdHeader::<LEN, CMD>::new(
751            version,
752            false,
753            None,
754            cmd,
755            LEN::from_u64(body.len() as u64).unwrap(),
756        );
757        let buf = header
758            .to_vec()
759            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
760        let mut send = conn.send.get().await;
761        if buf.len() > 255 {
762            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
763        }
764        send.write_u8(buf.len() as u8)
765            .await
766            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
767        send.write_all(buf.as_slice())
768            .await
769            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
770        send.write_all(body)
771            .await
772            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
773        send.flush()
774            .await
775            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
776        Ok(())
777    }
778
779    async fn send_by_specify_tunnel_with_resp(
780        &self,
781        peer_id: &PeerId,
782        tunnel_id: TunnelId,
783        cmd: CMD,
784        version: u8,
785        body: &[u8],
786        timeout: Duration,
787    ) -> CmdResult<CmdBody> {
788        let conn = self.peer_manager.find_connection(tunnel_id);
789        if conn.is_none() {
790            return Err(cmd_err!(
791                CmdErrorCode::PeerConnectionNotFound,
792                "tunnel_id: {:?}",
793                tunnel_id
794            ));
795        }
796        let conn = conn.unwrap();
797        if let Some(id) = tokio::task::try_id() {
798            if self.state_holder.has_state(id) {
799                return Err(cmd_err!(
800                    CmdErrorCode::Failed,
801                    "can't send msg with resp in tunnel {:?} msg handle",
802                    conn.conn_id
803                ));
804            }
805        }
806        assert_eq!(tunnel_id, conn.conn_id);
807        log::trace!(
808            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
809            peer_id,
810            conn.conn_id,
811            cmd,
812            body.len(),
813            hex::encode(body)
814        );
815        let seq = gen_seq();
816        let header = CmdHeader::<LEN, CMD>::new(
817            version,
818            false,
819            Some(seq),
820            cmd,
821            LEN::from_u64(body.len() as u64).unwrap(),
822        );
823        let buf = header
824            .to_vec()
825            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
826        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
827        let waiter = self
828            .resp_waiter
829            .create_timeout_result_future(resp_id, timeout)
830            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
831        {
832            let mut send = conn.send.get().await;
833            if buf.len() > 255 {
834                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
835            }
836            send.write_u8(buf.len() as u8)
837                .await
838                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
839            send.write_all(buf.as_slice())
840                .await
841                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
842            send.write_all(body)
843                .await
844                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
845            send.flush()
846                .await
847                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
848        }
849        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
850        Ok(body)
851    }
852
853    async fn send2_by_specify_tunnel(
854        &self,
855        peer_id: &PeerId,
856        tunnel_id: TunnelId,
857        cmd: CMD,
858        version: u8,
859        body: &[&[u8]],
860    ) -> CmdResult<()> {
861        let conn = self.peer_manager.find_connection(tunnel_id);
862        if conn.is_none() {
863            return Err(cmd_err!(
864                CmdErrorCode::PeerConnectionNotFound,
865                "tunnel_id: {:?}",
866                tunnel_id
867            ));
868        }
869        let conn = conn.unwrap();
870        assert_eq!(tunnel_id, conn.conn_id);
871        let mut len = 0;
872        for b in body.iter() {
873            len += b.len();
874            log::debug!(
875                "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
876                peer_id,
877                conn.conn_id,
878                cmd,
879                hex::encode(b)
880            );
881        }
882        log::debug!(
883            "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
884            peer_id,
885            conn.conn_id,
886            cmd,
887            len
888        );
889        let header = CmdHeader::<LEN, CMD>::new(
890            version,
891            false,
892            None,
893            cmd,
894            LEN::from_u64(len as u64).unwrap(),
895        );
896        let buf = header
897            .to_vec()
898            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
899        if buf.len() > 255 {
900            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
901        }
902        let mut send = conn.send.get().await;
903        send.write_u8(buf.len() as u8)
904            .await
905            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
906        send.write_all(buf.as_slice())
907            .await
908            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
909        for b in body.iter() {
910            send.write_all(b)
911                .await
912                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
913        }
914        send.flush()
915            .await
916            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
917        Ok(())
918    }
919
920    async fn send2_by_specify_tunnel_with_resp(
921        &self,
922        peer_id: &PeerId,
923        tunnel_id: TunnelId,
924        cmd: CMD,
925        version: u8,
926        body: &[&[u8]],
927        timeout: Duration,
928    ) -> CmdResult<CmdBody> {
929        let conn = self.peer_manager.find_connection(tunnel_id);
930        if conn.is_none() {
931            return Err(cmd_err!(
932                CmdErrorCode::PeerConnectionNotFound,
933                "tunnel_id: {:?}",
934                tunnel_id
935            ));
936        }
937        let conn = conn.unwrap();
938        if let Some(id) = tokio::task::try_id() {
939            if self.state_holder.has_state(id) {
940                return Err(cmd_err!(
941                    CmdErrorCode::Failed,
942                    "can't send msg with resp in tunnel {:?} msg handle",
943                    conn.conn_id
944                ));
945            }
946        }
947        assert_eq!(tunnel_id, conn.conn_id);
948        let mut len = 0;
949        for b in body.iter() {
950            len += b.len();
951            log::debug!(
952                "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
953                peer_id,
954                conn.conn_id,
955                cmd,
956                hex::encode(b)
957            );
958        }
959        log::debug!(
960            "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
961            peer_id,
962            conn.conn_id,
963            cmd,
964            len
965        );
966        let seq = gen_seq();
967        let header = CmdHeader::<LEN, CMD>::new(
968            version,
969            false,
970            Some(seq),
971            cmd,
972            LEN::from_u64(len as u64).unwrap(),
973        );
974        let buf = header
975            .to_vec()
976            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
977        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
978        let waiter = self
979            .resp_waiter
980            .create_timeout_result_future(resp_id, timeout)
981            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
982        if buf.len() > 255 {
983            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
984        }
985        {
986            let mut send = conn.send.get().await;
987            send.write_u8(buf.len() as u8)
988                .await
989                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
990            send.write_all(buf.as_slice())
991                .await
992                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
993            for b in body.iter() {
994                send.write_all(b)
995                    .await
996                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
997            }
998            send.flush()
999                .await
1000                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1001        }
1002        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1003        Ok(body)
1004    }
1005
1006    async fn send_cmd_by_specify_tunnel(
1007        &self,
1008        peer_id: &PeerId,
1009        tunnel_id: TunnelId,
1010        cmd: CMD,
1011        version: u8,
1012        mut body: CmdBody,
1013    ) -> CmdResult<()> {
1014        let conn = self.peer_manager.find_connection(tunnel_id);
1015        if conn.is_none() {
1016            return Err(cmd_err!(
1017                CmdErrorCode::PeerConnectionNotFound,
1018                "tunnel_id: {:?}",
1019                tunnel_id
1020            ));
1021        }
1022        let conn = conn.unwrap();
1023        assert_eq!(tunnel_id, conn.conn_id);
1024        log::debug!(
1025            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1026            peer_id,
1027            conn.conn_id,
1028            cmd,
1029            body.len()
1030        );
1031        let header = CmdHeader::<LEN, CMD>::new(
1032            version,
1033            false,
1034            None,
1035            cmd,
1036            LEN::from_u64(body.len()).unwrap(),
1037        );
1038        let buf = header
1039            .to_vec()
1040            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1041        let mut send = conn.send.get().await;
1042        if buf.len() > 255 {
1043            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1044        }
1045        send.write_u8(buf.len() as u8)
1046            .await
1047            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1048        send.write_all(buf.as_slice())
1049            .await
1050            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1051        tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1052            .await
1053            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1054        send.flush()
1055            .await
1056            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1057        Ok(())
1058    }
1059
1060    async fn send_cmd_by_specify_tunnel_with_resp(
1061        &self,
1062        peer_id: &PeerId,
1063        tunnel_id: TunnelId,
1064        cmd: CMD,
1065        version: u8,
1066        mut body: CmdBody,
1067        timeout: Duration,
1068    ) -> CmdResult<CmdBody> {
1069        let conn = self.peer_manager.find_connection(tunnel_id);
1070        if conn.is_none() {
1071            return Err(cmd_err!(
1072                CmdErrorCode::PeerConnectionNotFound,
1073                "tunnel_id: {:?}",
1074                tunnel_id
1075            ));
1076        }
1077        let conn = conn.unwrap();
1078        if let Some(id) = tokio::task::try_id() {
1079            if self.state_holder.has_state(id) {
1080                return Err(cmd_err!(
1081                    CmdErrorCode::Failed,
1082                    "can't send msg with resp in tunnel {:?} msg handle",
1083                    conn.conn_id
1084                ));
1085            }
1086        }
1087        assert_eq!(tunnel_id, conn.conn_id);
1088        log::debug!(
1089            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1090            peer_id,
1091            conn.conn_id,
1092            cmd,
1093            body.len()
1094        );
1095        let seq = gen_seq();
1096        let header = CmdHeader::<LEN, CMD>::new(
1097            version,
1098            false,
1099            Some(seq),
1100            cmd,
1101            LEN::from_u64(body.len()).unwrap(),
1102        );
1103        let buf = header
1104            .to_vec()
1105            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1106        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1107        let waiter = self
1108            .resp_waiter
1109            .create_timeout_result_future(resp_id, timeout)
1110            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1111        {
1112            let mut send = conn.send.get().await;
1113            if buf.len() > 255 {
1114                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1115            }
1116            send.write_u8(buf.len() as u8)
1117                .await
1118                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1119            send.write_all(buf.as_slice())
1120                .await
1121                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1122            tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1123                .await
1124                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1125            send.flush()
1126                .await
1127                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1128        }
1129        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1130        Ok(body)
1131    }
1132
1133    async fn send_by_all_tunnels(
1134        &self,
1135        peer_id: &PeerId,
1136        cmd: CMD,
1137        version: u8,
1138        body: &[u8],
1139    ) -> CmdResult<()> {
1140        let connections = self.peer_manager.find_connections(peer_id);
1141        for conn in connections {
1142            let _ret: CmdResult<()> = async move {
1143                let header = CmdHeader::<LEN, CMD>::new(
1144                    version,
1145                    false,
1146                    None,
1147                    cmd,
1148                    LEN::from_u64(body.len() as u64).unwrap(),
1149                );
1150                let buf = header
1151                    .to_vec()
1152                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1153                let mut send = conn.send.get().await;
1154                send.write_u8(buf.len() as u8)
1155                    .await
1156                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1157                send.write_all(buf.as_slice())
1158                    .await
1159                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1160                send.write_all(body)
1161                    .await
1162                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1163                send.flush()
1164                    .await
1165                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1166                Ok(())
1167            }
1168            .await;
1169        }
1170        Ok(())
1171    }
1172
1173    async fn send2_by_all_tunnels(
1174        &self,
1175        peer_id: &PeerId,
1176        cmd: CMD,
1177        version: u8,
1178        body: &[&[u8]],
1179    ) -> CmdResult<()> {
1180        let connections = self.peer_manager.find_connections(peer_id);
1181        let mut len = 0;
1182        for b in body.iter() {
1183            len += b.len();
1184        }
1185        for conn in connections {
1186            let _ret: CmdResult<()> = async move {
1187                let header = CmdHeader::<LEN, CMD>::new(
1188                    version,
1189                    false,
1190                    None,
1191                    cmd,
1192                    LEN::from_u64(len as u64).unwrap(),
1193                );
1194                let buf = header
1195                    .to_vec()
1196                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1197                if buf.len() > 255 {
1198                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1199                }
1200                let mut send = conn.send.get().await;
1201                send.write_u8(buf.len() as u8)
1202                    .await
1203                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1204                send.write_all(buf.as_slice())
1205                    .await
1206                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1207                for b in body.iter() {
1208                    send.write_all(b)
1209                        .await
1210                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1211                }
1212                send.flush()
1213                    .await
1214                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1215                Ok(())
1216            }
1217            .await;
1218        }
1219        Ok(())
1220    }
1221}
1222
1223pub struct DefaultCmdServerIncoming<
1224    M: CmdTunnelMeta,
1225    R: CmdTunnelRead<M>,
1226    W: CmdTunnelWrite<M>,
1227    LISTENER,
1228> {
1229    tunnel_listener: LISTENER,
1230    tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1231    _p: PhantomData<fn() -> (M, R, W)>,
1232}
1233
1234impl<
1235    M: CmdTunnelMeta,
1236    R: CmdTunnelRead<M>,
1237    W: CmdTunnelWrite<M>,
1238    LISTENER: CmdTunnelListener<M, R, W>,
1239> DefaultCmdServerIncoming<M, R, W, LISTENER>
1240{
1241    pub fn new(
1242        tunnel_listener: LISTENER,
1243        tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1244    ) -> Arc<Self> {
1245        Arc::new(Self {
1246            tunnel_listener,
1247            tunnel_service,
1248            _p: PhantomData,
1249        })
1250    }
1251
1252    pub fn start(self: &Arc<Self>) {
1253        let this = self.clone();
1254        tokio::spawn(async move {
1255            if let Err(e) = this.run().await {
1256                log::error!("cmd server error: {:?}", e);
1257            }
1258        });
1259    }
1260
1261    pub async fn run(&self) -> CmdResult<()> {
1262        loop {
1263            let tunnel = self.tunnel_listener.accept().await?;
1264            let tunnel_service = self.tunnel_service.clone();
1265            tokio::spawn(async move {
1266                if let Err(e) = tunnel_service.handle_tunnel(tunnel).await {
1267                    log::error!("peer connection error: {:?}", e);
1268                }
1269            });
1270        }
1271    }
1272}
1273
1274pub struct DefaultCmdServer<
1275    M: CmdTunnelMeta,
1276    R: CmdTunnelRead<M>,
1277    W: CmdTunnelWrite<M>,
1278    LEN,
1279    CMD,
1280    LISTENER,
1281> {
1282    incoming: Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>>,
1283    service: Arc<DefaultCmdServerService<M, R, W, LEN, CMD>>,
1284}
1285
1286impl<
1287    M: CmdTunnelMeta,
1288    R: CmdTunnelRead<M>,
1289    W: CmdTunnelWrite<M>,
1290    LEN: RawEncode
1291        + for<'a> RawDecode<'a>
1292        + Copy
1293        + RawFixedBytes
1294        + Sync
1295        + Send
1296        + 'static
1297        + FromPrimitive
1298        + ToPrimitive,
1299    CMD: RawEncode
1300        + for<'a> RawDecode<'a>
1301        + Copy
1302        + RawFixedBytes
1303        + Sync
1304        + Send
1305        + 'static
1306        + Eq
1307        + Hash
1308        + Debug,
1309    LISTENER: CmdTunnelListener<M, R, W>,
1310> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1311{
1312    pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
1313        let service = DefaultCmdServerService::new();
1314        let incoming = DefaultCmdServerIncoming::new(tunnel_listener, service.clone());
1315        Arc::new(Self { incoming, service })
1316    }
1317
1318    pub fn incoming(&self) -> Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>> {
1319        self.incoming.clone()
1320    }
1321
1322    pub fn service(&self) -> Arc<DefaultCmdServerService<M, R, W, LEN, CMD>> {
1323        self.service.clone()
1324    }
1325
1326    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
1327        self.service.attach_event_listener(event_listener);
1328    }
1329
1330    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
1331        self.service.get_peer_tunnels(peer_id).await
1332    }
1333
1334    pub fn start(self: &Arc<Self>) {
1335        self.incoming.start();
1336    }
1337}
1338
1339impl<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> Deref
1340    for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1341{
1342    type Target = DefaultCmdServerService<M, R, W, LEN, CMD>;
1343
1344    fn deref(&self) -> &Self::Target {
1345        self.service.as_ref()
1346    }
1347}
1348
1349#[async_trait::async_trait]
1350impl<
1351    M: CmdTunnelMeta,
1352    R: CmdTunnelRead<M>,
1353    W: CmdTunnelWrite<M>,
1354    LEN: RawEncode
1355        + for<'a> RawDecode<'a>
1356        + Copy
1357        + RawFixedBytes
1358        + Sync
1359        + Send
1360        + 'static
1361        + FromPrimitive
1362        + ToPrimitive,
1363    CMD: RawEncode
1364        + for<'a> RawDecode<'a>
1365        + Copy
1366        + RawFixedBytes
1367        + Sync
1368        + Send
1369        + 'static
1370        + Eq
1371        + Hash
1372        + Debug,
1373    LISTENER: CmdTunnelListener<M, R, W>,
1374> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1375{
1376    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
1377        self.service.register_cmd_handler(cmd, handler);
1378    }
1379
1380    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
1381        self.service.send(peer_id, cmd, version, body).await
1382    }
1383
1384    async fn send_with_resp(
1385        &self,
1386        peer_id: &PeerId,
1387        cmd: CMD,
1388        version: u8,
1389        body: &[u8],
1390        timeout: Duration,
1391    ) -> CmdResult<CmdBody> {
1392        self.service
1393            .send_with_resp(peer_id, cmd, version, body, timeout)
1394            .await
1395    }
1396
1397    async fn send2(
1398        &self,
1399        peer_id: &PeerId,
1400        cmd: CMD,
1401        version: u8,
1402        body: &[&[u8]],
1403    ) -> CmdResult<()> {
1404        self.service.send2(peer_id, cmd, version, body).await
1405    }
1406
1407    async fn send2_with_resp(
1408        &self,
1409        peer_id: &PeerId,
1410        cmd: CMD,
1411        version: u8,
1412        body: &[&[u8]],
1413        timeout: Duration,
1414    ) -> CmdResult<CmdBody> {
1415        self.service
1416            .send2_with_resp(peer_id, cmd, version, body, timeout)
1417            .await
1418    }
1419
1420    async fn send_cmd(
1421        &self,
1422        peer_id: &PeerId,
1423        cmd: CMD,
1424        version: u8,
1425        body: CmdBody,
1426    ) -> CmdResult<()> {
1427        self.service.send_cmd(peer_id, cmd, version, body).await
1428    }
1429
1430    async fn send_cmd_with_resp(
1431        &self,
1432        peer_id: &PeerId,
1433        cmd: CMD,
1434        version: u8,
1435        body: CmdBody,
1436        timeout: Duration,
1437    ) -> CmdResult<CmdBody> {
1438        self.service
1439            .send_cmd_with_resp(peer_id, cmd, version, body, timeout)
1440            .await
1441    }
1442
1443    async fn send_by_specify_tunnel(
1444        &self,
1445        peer_id: &PeerId,
1446        tunnel_id: TunnelId,
1447        cmd: CMD,
1448        version: u8,
1449        body: &[u8],
1450    ) -> CmdResult<()> {
1451        self.service
1452            .send_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1453            .await
1454    }
1455
1456    async fn send_by_specify_tunnel_with_resp(
1457        &self,
1458        peer_id: &PeerId,
1459        tunnel_id: TunnelId,
1460        cmd: CMD,
1461        version: u8,
1462        body: &[u8],
1463        timeout: Duration,
1464    ) -> CmdResult<CmdBody> {
1465        self.service
1466            .send_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1467            .await
1468    }
1469
1470    async fn send2_by_specify_tunnel(
1471        &self,
1472        peer_id: &PeerId,
1473        tunnel_id: TunnelId,
1474        cmd: CMD,
1475        version: u8,
1476        body: &[&[u8]],
1477    ) -> CmdResult<()> {
1478        self.service
1479            .send2_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1480            .await
1481    }
1482
1483    async fn send2_by_specify_tunnel_with_resp(
1484        &self,
1485        peer_id: &PeerId,
1486        tunnel_id: TunnelId,
1487        cmd: CMD,
1488        version: u8,
1489        body: &[&[u8]],
1490        timeout: Duration,
1491    ) -> CmdResult<CmdBody> {
1492        self.service
1493            .send2_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1494            .await
1495    }
1496
1497    async fn send_cmd_by_specify_tunnel(
1498        &self,
1499        peer_id: &PeerId,
1500        tunnel_id: TunnelId,
1501        cmd: CMD,
1502        version: u8,
1503        body: CmdBody,
1504    ) -> CmdResult<()> {
1505        self.service
1506            .send_cmd_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1507            .await
1508    }
1509
1510    async fn send_cmd_by_specify_tunnel_with_resp(
1511        &self,
1512        peer_id: &PeerId,
1513        tunnel_id: TunnelId,
1514        cmd: CMD,
1515        version: u8,
1516        body: CmdBody,
1517        timeout: Duration,
1518    ) -> CmdResult<CmdBody> {
1519        self.service
1520            .send_cmd_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1521            .await
1522    }
1523
1524    async fn send_by_all_tunnels(
1525        &self,
1526        peer_id: &PeerId,
1527        cmd: CMD,
1528        version: u8,
1529        body: &[u8],
1530    ) -> CmdResult<()> {
1531        self.service
1532            .send_by_all_tunnels(peer_id, cmd, version, body)
1533            .await
1534    }
1535
1536    async fn send2_by_all_tunnels(
1537        &self,
1538        peer_id: &PeerId,
1539        cmd: CMD,
1540        version: u8,
1541        body: &[&[u8]],
1542    ) -> CmdResult<()> {
1543        self.service
1544            .send2_by_all_tunnels(peer_id, cmd, version, body)
1545            .await
1546    }
1547}