Skip to main content

sfo_cmd_server/server/
server.rs

1use super::CmdServer;
2use super::peer_manager::{PeerManager, PeerManagerRef};
3use crate::client::{RespWaiter, RespWaiterRef, gen_resp_id, gen_seq};
4use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
5use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
6use crate::peer_connection::PeerConnection;
7use crate::peer_id::PeerId;
8use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
9use async_named_locker::{NamedStateHolder, ObjectHolder};
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_split::Splittable;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::marker::PhantomData;
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
20
21#[async_trait::async_trait]
22pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
23    Send + Sync + 'static
24{
25    async fn accept(&self) -> CmdResult<Splittable<R, W>>;
26}
27
28#[async_trait::async_trait]
29pub trait CmdTunnelService<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
30    Send + Sync + 'static
31{
32    async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()>;
33}
34
35#[async_trait::async_trait]
36pub trait CmdServerEventListener: Send + Sync + 'static {
37    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
38    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
39}
40
41#[derive(Clone)]
42struct CmdServerEventListenerEmit {
43    listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
44}
45
46impl CmdServerEventListenerEmit {
47    pub fn new() -> Self {
48        Self {
49            listeners: Arc::new(Mutex::new(Vec::new())),
50        }
51    }
52
53    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
54        self.listeners.lock().unwrap().push(event_listener);
55    }
56}
57
58#[async_trait::async_trait]
59impl CmdServerEventListener for CmdServerEventListenerEmit {
60    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
61        let listeners = { self.listeners.lock().unwrap().clone() };
62        for listener in listeners.iter() {
63            if let Err(e) = listener.on_peer_connected(peer_id).await {
64                log::error!("on_peer_connected error: {:?}", e);
65            }
66        }
67        Ok(())
68    }
69
70    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
71        let listeners = { self.listeners.lock().unwrap().clone() };
72        for listener in listeners.iter() {
73            if let Err(e) = listener.on_peer_disconnected(peer_id).await {
74                log::error!("on_peer_disconnected error: {:?}", e);
75            }
76        }
77        Ok(())
78    }
79}
80
81pub struct DefaultCmdServerService<
82    M: CmdTunnelMeta,
83    R: CmdTunnelRead<M>,
84    W: CmdTunnelWrite<M>,
85    LEN,
86    CMD,
87> {
88    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
89    peer_manager: PeerManagerRef<M, R, W>,
90    event_emit: CmdServerEventListenerEmit,
91    resp_waiter: RespWaiterRef,
92    state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
93    _p: PhantomData<fn() -> (M, R, W, LEN, CMD)>,
94}
95
96impl<
97    M: CmdTunnelMeta,
98    R: CmdTunnelRead<M>,
99    W: CmdTunnelWrite<M>,
100    LEN: RawEncode
101        + for<'a> RawDecode<'a>
102        + Copy
103        + RawFixedBytes
104        + Sync
105        + Send
106        + 'static
107        + FromPrimitive
108        + ToPrimitive,
109    CMD: RawEncode
110        + for<'a> RawDecode<'a>
111        + Copy
112        + RawFixedBytes
113        + Sync
114        + Send
115        + 'static
116        + Eq
117        + Hash
118        + Debug,
119> DefaultCmdServerService<M, R, W, LEN, CMD>
120{
121    pub fn new() -> Arc<Self> {
122        let event_emit = CmdServerEventListenerEmit::new();
123        Arc::new(Self {
124            cmd_handler_map: Arc::new(CmdHandlerMap::new()),
125            peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
126            event_emit,
127            resp_waiter: Arc::new(RespWaiter::new()),
128            state_holder: NamedStateHolder::new(),
129            _p: PhantomData,
130        })
131    }
132
133    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
134        self.event_emit.attach_event_listener(event_listener);
135    }
136
137    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
138        self.peer_manager.find_connections(peer_id)
139    }
140
141    pub async fn serve_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
142        let peer_id = tunnel.get_remote_peer_id();
143        let tunnel_id = self.peer_manager.generate_conn_id();
144        let cmd_handler_map = self.cmd_handler_map.clone();
145        let resp_waiter = self.resp_waiter.clone();
146        let state_holder = self.state_holder.clone();
147        let (mut reader, writer) = tunnel.split();
148        let local_id = reader.get_local_peer_id();
149        let writer = ObjectHolder::new(writer);
150        let resp_write = writer.clone();
151        let remote_id = peer_id.clone();
152        let recv_handle = tokio::spawn(async move {
153            let ret: CmdResult<()> = async move {
154                loop {
155                    let header_len = reader
156                        .read_u8()
157                        .await
158                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
159                    let mut header = vec![0u8; header_len as usize];
160                    let n = reader
161                        .read_exact(&mut header)
162                        .await
163                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
164                    if n == 0 {
165                        break;
166                    }
167                    let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
168                        .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
169                    sfo_log::debug!("recv cmd {:?}", header.cmd_code());
170                    let body_len = header.pkg_len().to_u64().unwrap();
171                    let cmd_read = CmdBodyRead::new(reader, body_len as usize);
172                    let waiter = cmd_read.get_waiter();
173                    let future = waiter
174                        .create_result_future()
175                        .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
176                    {
177                        let body = CmdBody::from_reader(BufReader::new(cmd_read), body_len);
178                        if header.is_resp() && header.seq().is_some() {
179                            let resp_id =
180                                gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
181                            let _ = resp_waiter.set_result(resp_id, body);
182                        } else if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
183                            let version = header.version();
184                            let seq = header.seq();
185                            let cmd_code = header.cmd_code();
186                            match {
187                                let _handle_state = state_holder.new_state(tokio::task::id());
188                                handler
189                                    .handle(
190                                        local_id.clone(),
191                                        remote_id.clone(),
192                                        tunnel_id,
193                                        header,
194                                        body,
195                                    )
196                                    .await
197                            } {
198                                Ok(Some(mut body)) => {
199                                    let mut write = resp_write.get().await;
200                                    let header = CmdHeader::<LEN, CMD>::new(
201                                        version,
202                                        true,
203                                        seq,
204                                        cmd_code,
205                                        LEN::from_u64(body.len()).unwrap(),
206                                    );
207                                    let buf = header
208                                        .to_vec()
209                                        .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
210                                    if buf.len() > 255 {
211                                        return Err(cmd_err!(
212                                            CmdErrorCode::RawCodecError,
213                                            "header len too large"
214                                        ));
215                                    }
216                                    write
217                                        .write_u8(buf.len() as u8)
218                                        .await
219                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
220                                    write
221                                        .write_all(buf.as_slice())
222                                        .await
223                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
224                                    tokio::io::copy(&mut body, write.deref_mut().deref_mut())
225                                        .await
226                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
227                                    write
228                                        .flush()
229                                        .await
230                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
231                                }
232                                Err(e) => {
233                                    log::error!("handle cmd error: {:?}", e);
234                                }
235                                Ok(None) => {}
236                            }
237                        }
238                    }
239                    reader = future
240                        .await
241                        .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
242                }
243                Ok(())
244            }
245            .await;
246            if let Err(e) = ret.as_ref() {
247                log::error!("recv cmd error: {:?}", e);
248            }
249            ret
250        });
251
252        let peer_conn = PeerConnection {
253            conn_id: tunnel_id,
254            peer_id,
255            send: writer,
256            handle: Some(recv_handle),
257        };
258        self.peer_manager.add_peer_connection(peer_conn).await;
259        Ok(())
260    }
261}
262
263#[async_trait::async_trait]
264impl<
265    M: CmdTunnelMeta,
266    R: CmdTunnelRead<M>,
267    W: CmdTunnelWrite<M>,
268    LEN: RawEncode
269        + for<'a> RawDecode<'a>
270        + Copy
271        + RawFixedBytes
272        + Sync
273        + Send
274        + 'static
275        + FromPrimitive
276        + ToPrimitive,
277    CMD: RawEncode
278        + for<'a> RawDecode<'a>
279        + Copy
280        + RawFixedBytes
281        + Sync
282        + Send
283        + 'static
284        + Eq
285        + Hash
286        + Debug,
287> CmdTunnelService<M, R, W> for DefaultCmdServerService<M, R, W, LEN, CMD>
288{
289    async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
290        self.serve_tunnel(tunnel).await
291    }
292}
293
294#[async_trait::async_trait]
295impl<
296    M: CmdTunnelMeta,
297    R: CmdTunnelRead<M>,
298    W: CmdTunnelWrite<M>,
299    LEN: RawEncode
300        + for<'a> RawDecode<'a>
301        + Copy
302        + RawFixedBytes
303        + Sync
304        + Send
305        + 'static
306        + FromPrimitive
307        + ToPrimitive,
308    CMD: RawEncode
309        + for<'a> RawDecode<'a>
310        + Copy
311        + RawFixedBytes
312        + Sync
313        + Send
314        + 'static
315        + Eq
316        + Hash
317        + Debug,
318> CmdServer<LEN, CMD> for DefaultCmdServerService<M, R, W, LEN, CMD>
319{
320    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
321        self.cmd_handler_map.insert(cmd, handler);
322    }
323
324    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
325        let connections = self.peer_manager.find_connections(peer_id);
326        for conn in connections {
327            let ret: CmdResult<()> = async move {
328                log::debug!(
329                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
330                    peer_id,
331                    conn.conn_id,
332                    cmd,
333                    body.len(),
334                    hex::encode(body)
335                );
336                let header = CmdHeader::<LEN, CMD>::new(
337                    version,
338                    false,
339                    None,
340                    cmd,
341                    LEN::from_u64(body.len() as u64).unwrap(),
342                );
343                let buf = header
344                    .to_vec()
345                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
346                let mut send = conn.send.get().await;
347                if buf.len() > 255 {
348                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
349                }
350                send.write_u8(buf.len() as u8)
351                    .await
352                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
353                send.write_all(buf.as_slice())
354                    .await
355                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
356                send.write_all(body)
357                    .await
358                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
359                send.flush()
360                    .await
361                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
362                Ok(())
363            }
364            .await;
365            if ret.is_ok() {
366                break;
367            }
368        }
369        Ok(())
370    }
371
372    async fn send_with_resp(
373        &self,
374        peer_id: &PeerId,
375        cmd: CMD,
376        version: u8,
377        body: &[u8],
378        timeout: Duration,
379    ) -> CmdResult<CmdBody> {
380        let connections = self.peer_manager.find_connections(peer_id);
381        for conn in connections {
382            if let Some(id) = tokio::task::try_id() {
383                if self.state_holder.has_state(id) {
384                    continue;
385                }
386            }
387            let ret: CmdResult<CmdBody> = async move {
388                log::debug!(
389                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
390                    peer_id,
391                    conn.conn_id,
392                    cmd,
393                    body.len(),
394                    hex::encode(body)
395                );
396                let seq = gen_seq();
397                let header = CmdHeader::<LEN, CMD>::new(
398                    version,
399                    false,
400                    Some(seq),
401                    cmd,
402                    LEN::from_u64(body.len() as u64).unwrap(),
403                );
404                let buf = header
405                    .to_vec()
406                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
407                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
408                let waiter = self
409                    .resp_waiter
410                    .create_timeout_result_future(resp_id, timeout)
411                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
412                {
413                    let mut send = conn.send.get().await;
414                    if buf.len() > 255 {
415                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
416                    }
417                    send.write_u8(buf.len() as u8)
418                        .await
419                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
420                    send.write_all(buf.as_slice())
421                        .await
422                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
423                    send.write_all(body)
424                        .await
425                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
426                    send.flush()
427                        .await
428                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
429                }
430                let body =
431                    waiter
432                        .await
433                        .map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
434                Ok(body)
435            }
436            .await;
437            if ret.is_ok() {
438                return ret;
439            } else {
440                sfo_log::error!("send err {:?}", ret.unwrap_err());
441            }
442        }
443        Err(cmd_err!(
444            CmdErrorCode::Failed,
445            "send to peer_id: {}",
446            peer_id
447        ))
448    }
449
450    async fn send2(
451        &self,
452        peer_id: &PeerId,
453        cmd: CMD,
454        version: u8,
455        body: &[&[u8]],
456    ) -> CmdResult<()> {
457        let connections = self.peer_manager.find_connections(peer_id);
458        for conn in connections {
459            let ret: CmdResult<()> = async move {
460                let mut len = 0;
461                for b in body.iter() {
462                    len += b.len();
463                    log::debug!(
464                        "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
465                        peer_id,
466                        conn.conn_id,
467                        cmd,
468                        hex::encode(b)
469                    );
470                }
471                log::debug!(
472                    "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
473                    peer_id,
474                    conn.conn_id,
475                    cmd,
476                    len
477                );
478                let header = CmdHeader::<LEN, CMD>::new(
479                    version,
480                    false,
481                    None,
482                    cmd,
483                    LEN::from_u64(len as u64).unwrap(),
484                );
485                let buf = header
486                    .to_vec()
487                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
488                let mut send = conn.send.get().await;
489                if buf.len() > 255 {
490                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
491                }
492                send.write_u8(buf.len() as u8)
493                    .await
494                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
495                send.write_all(buf.as_slice())
496                    .await
497                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
498                for b in body.iter() {
499                    send.write_all(b)
500                        .await
501                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
502                }
503                send.flush()
504                    .await
505                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
506                Ok(())
507            }
508            .await;
509            if ret.is_ok() {
510                break;
511            }
512        }
513        Ok(())
514    }
515
516    async fn send2_with_resp(
517        &self,
518        peer_id: &PeerId,
519        cmd: CMD,
520        version: u8,
521        body: &[&[u8]],
522        timeout: Duration,
523    ) -> CmdResult<CmdBody> {
524        let connections = self.peer_manager.find_connections(peer_id);
525        for conn in connections {
526            if let Some(id) = tokio::task::try_id() {
527                if self.state_holder.has_state(id) {
528                    continue;
529                }
530            }
531            let ret: CmdResult<CmdBody> = async move {
532                let mut len = 0;
533                for b in body.iter() {
534                    len += b.len();
535                    log::debug!(
536                        "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
537                        peer_id,
538                        conn.conn_id,
539                        cmd,
540                        hex::encode(b)
541                    );
542                }
543                log::debug!(
544                    "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
545                    peer_id,
546                    conn.conn_id,
547                    cmd,
548                    len
549                );
550                let seq = gen_seq();
551                let header = CmdHeader::<LEN, CMD>::new(
552                    version,
553                    false,
554                    Some(seq),
555                    cmd,
556                    LEN::from_u64(len as u64).unwrap(),
557                );
558                let buf = header
559                    .to_vec()
560                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
561                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
562                let waiter = self
563                    .resp_waiter
564                    .create_timeout_result_future(resp_id, timeout)
565                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
566                {
567                    let mut send = conn.send.get().await;
568                    if buf.len() > 255 {
569                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
570                    }
571                    send.write_u8(buf.len() as u8)
572                        .await
573                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
574                    send.write_all(buf.as_slice())
575                        .await
576                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
577                    for b in body.iter() {
578                        send.write_all(b)
579                            .await
580                            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
581                    }
582                    send.flush()
583                        .await
584                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
585                }
586                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
587                Ok(body)
588            }
589            .await;
590            if ret.is_ok() {
591                return ret;
592            }
593        }
594        Err(cmd_err!(
595            CmdErrorCode::Failed,
596            "send to peer_id: {}",
597            peer_id
598        ))
599    }
600
601    async fn send_cmd(
602        &self,
603        peer_id: &PeerId,
604        cmd: CMD,
605        version: u8,
606        body: CmdBody,
607    ) -> CmdResult<()> {
608        let body_data = body.into_bytes().await?;
609        let body = body_data.as_slice();
610        let connections = self.peer_manager.find_connections(peer_id);
611        for conn in connections {
612            let ret: CmdResult<()> = async move {
613                log::debug!(
614                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
615                    peer_id,
616                    conn.conn_id,
617                    cmd,
618                    body.len(),
619                    hex::encode(body)
620                );
621                let header = CmdHeader::<LEN, CMD>::new(
622                    version,
623                    false,
624                    None,
625                    cmd,
626                    LEN::from_u64(body.len() as u64).unwrap(),
627                );
628                let buf = header
629                    .to_vec()
630                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
631                let mut send = conn.send.get().await;
632                if buf.len() > 255 {
633                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
634                }
635                send.write_u8(buf.len() as u8)
636                    .await
637                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
638                send.write_all(buf.as_slice())
639                    .await
640                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
641                send.write_all(body)
642                    .await
643                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
644                send.flush()
645                    .await
646                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
647                Ok(())
648            }
649            .await;
650            if ret.is_ok() {
651                break;
652            }
653        }
654        Ok(())
655    }
656
657    async fn send_cmd_with_resp(
658        &self,
659        peer_id: &PeerId,
660        cmd: CMD,
661        version: u8,
662        body: CmdBody,
663        timeout: Duration,
664    ) -> CmdResult<CmdBody> {
665        let connections = self.peer_manager.find_connections(peer_id);
666        let body_data = body.into_bytes().await?;
667        let data_ref = body_data.as_slice();
668        for conn in connections {
669            if let Some(id) = tokio::task::try_id() {
670                if self.state_holder.has_state(id) {
671                    continue;
672                }
673            }
674            let ret: CmdResult<CmdBody> = async move {
675                log::debug!(
676                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}",
677                    peer_id,
678                    conn.conn_id,
679                    cmd,
680                    data_ref.len()
681                );
682                let seq = gen_seq();
683                let header = CmdHeader::<LEN, CMD>::new(
684                    version,
685                    false,
686                    Some(seq),
687                    cmd,
688                    LEN::from_u64(data_ref.len() as u64).unwrap(),
689                );
690                let buf = header
691                    .to_vec()
692                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
693                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
694                let waiter = self
695                    .resp_waiter
696                    .create_timeout_result_future(resp_id, timeout)
697                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
698                {
699                    let mut send = conn.send.get().await;
700                    if buf.len() > 255 {
701                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
702                    }
703                    send.write_u8(buf.len() as u8)
704                        .await
705                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
706                    send.write_all(buf.as_slice())
707                        .await
708                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
709                    send.write_all(data_ref)
710                        .await
711                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
712                    send.flush()
713                        .await
714                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
715                }
716                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
717                Ok(body)
718            }
719            .await;
720            if ret.is_ok() {
721                return ret;
722            }
723        }
724        Err(cmd_err!(
725            CmdErrorCode::Failed,
726            "send to peer_id: {}",
727            peer_id
728        ))
729    }
730
731    async fn send_by_specify_tunnel(
732        &self,
733        peer_id: &PeerId,
734        tunnel_id: TunnelId,
735        cmd: CMD,
736        version: u8,
737        body: &[u8],
738    ) -> CmdResult<()> {
739        let conn = self.peer_manager.find_connection(tunnel_id);
740        if conn.is_none() {
741            return Err(cmd_err!(
742                CmdErrorCode::PeerConnectionNotFound,
743                "tunnel_id: {:?}",
744                tunnel_id
745            ));
746        }
747        let conn = conn.unwrap();
748        assert_eq!(tunnel_id, conn.conn_id);
749        log::trace!(
750            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
751            peer_id,
752            conn.conn_id,
753            cmd,
754            body.len(),
755            hex::encode(body)
756        );
757        let header = CmdHeader::<LEN, CMD>::new(
758            version,
759            false,
760            None,
761            cmd,
762            LEN::from_u64(body.len() as u64).unwrap(),
763        );
764        let buf = header
765            .to_vec()
766            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
767        let mut send = conn.send.get().await;
768        if buf.len() > 255 {
769            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
770        }
771        send.write_u8(buf.len() as u8)
772            .await
773            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
774        send.write_all(buf.as_slice())
775            .await
776            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
777        send.write_all(body)
778            .await
779            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
780        send.flush()
781            .await
782            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
783        Ok(())
784    }
785
786    async fn send_by_specify_tunnel_with_resp(
787        &self,
788        peer_id: &PeerId,
789        tunnel_id: TunnelId,
790        cmd: CMD,
791        version: u8,
792        body: &[u8],
793        timeout: Duration,
794    ) -> CmdResult<CmdBody> {
795        let conn = self.peer_manager.find_connection(tunnel_id);
796        if conn.is_none() {
797            return Err(cmd_err!(
798                CmdErrorCode::PeerConnectionNotFound,
799                "tunnel_id: {:?}",
800                tunnel_id
801            ));
802        }
803        let conn = conn.unwrap();
804        if let Some(id) = tokio::task::try_id() {
805            if self.state_holder.has_state(id) {
806                return Err(cmd_err!(
807                    CmdErrorCode::Failed,
808                    "can't send msg with resp in tunnel {:?} msg handle",
809                    conn.conn_id
810                ));
811            }
812        }
813        assert_eq!(tunnel_id, conn.conn_id);
814        log::trace!(
815            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
816            peer_id,
817            conn.conn_id,
818            cmd,
819            body.len(),
820            hex::encode(body)
821        );
822        let seq = gen_seq();
823        let header = CmdHeader::<LEN, CMD>::new(
824            version,
825            false,
826            Some(seq),
827            cmd,
828            LEN::from_u64(body.len() as u64).unwrap(),
829        );
830        let buf = header
831            .to_vec()
832            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
833        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
834        let waiter = self
835            .resp_waiter
836            .create_timeout_result_future(resp_id, timeout)
837            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
838        {
839            let mut send = conn.send.get().await;
840            if buf.len() > 255 {
841                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
842            }
843            send.write_u8(buf.len() as u8)
844                .await
845                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
846            send.write_all(buf.as_slice())
847                .await
848                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
849            send.write_all(body)
850                .await
851                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
852            send.flush()
853                .await
854                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
855        }
856        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
857        Ok(body)
858    }
859
860    async fn send2_by_specify_tunnel(
861        &self,
862        peer_id: &PeerId,
863        tunnel_id: TunnelId,
864        cmd: CMD,
865        version: u8,
866        body: &[&[u8]],
867    ) -> CmdResult<()> {
868        let conn = self.peer_manager.find_connection(tunnel_id);
869        if conn.is_none() {
870            return Err(cmd_err!(
871                CmdErrorCode::PeerConnectionNotFound,
872                "tunnel_id: {:?}",
873                tunnel_id
874            ));
875        }
876        let conn = conn.unwrap();
877        assert_eq!(tunnel_id, conn.conn_id);
878        let mut len = 0;
879        for b in body.iter() {
880            len += b.len();
881            log::debug!(
882                "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
883                peer_id,
884                conn.conn_id,
885                cmd,
886                hex::encode(b)
887            );
888        }
889        log::debug!(
890            "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
891            peer_id,
892            conn.conn_id,
893            cmd,
894            len
895        );
896        let header = CmdHeader::<LEN, CMD>::new(
897            version,
898            false,
899            None,
900            cmd,
901            LEN::from_u64(len as u64).unwrap(),
902        );
903        let buf = header
904            .to_vec()
905            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
906        if buf.len() > 255 {
907            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
908        }
909        let mut send = conn.send.get().await;
910        send.write_u8(buf.len() as u8)
911            .await
912            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
913        send.write_all(buf.as_slice())
914            .await
915            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
916        for b in body.iter() {
917            send.write_all(b)
918                .await
919                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
920        }
921        send.flush()
922            .await
923            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
924        Ok(())
925    }
926
927    async fn send2_by_specify_tunnel_with_resp(
928        &self,
929        peer_id: &PeerId,
930        tunnel_id: TunnelId,
931        cmd: CMD,
932        version: u8,
933        body: &[&[u8]],
934        timeout: Duration,
935    ) -> CmdResult<CmdBody> {
936        let conn = self.peer_manager.find_connection(tunnel_id);
937        if conn.is_none() {
938            return Err(cmd_err!(
939                CmdErrorCode::PeerConnectionNotFound,
940                "tunnel_id: {:?}",
941                tunnel_id
942            ));
943        }
944        let conn = conn.unwrap();
945        if let Some(id) = tokio::task::try_id() {
946            if self.state_holder.has_state(id) {
947                return Err(cmd_err!(
948                    CmdErrorCode::Failed,
949                    "can't send msg with resp in tunnel {:?} msg handle",
950                    conn.conn_id
951                ));
952            }
953        }
954        assert_eq!(tunnel_id, conn.conn_id);
955        let mut len = 0;
956        for b in body.iter() {
957            len += b.len();
958            log::debug!(
959                "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
960                peer_id,
961                conn.conn_id,
962                cmd,
963                hex::encode(b)
964            );
965        }
966        log::debug!(
967            "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
968            peer_id,
969            conn.conn_id,
970            cmd,
971            len
972        );
973        let seq = gen_seq();
974        let header = CmdHeader::<LEN, CMD>::new(
975            version,
976            false,
977            Some(seq),
978            cmd,
979            LEN::from_u64(len as u64).unwrap(),
980        );
981        let buf = header
982            .to_vec()
983            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
984        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
985        let waiter = self
986            .resp_waiter
987            .create_timeout_result_future(resp_id, timeout)
988            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
989        if buf.len() > 255 {
990            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
991        }
992        {
993            let mut send = conn.send.get().await;
994            send.write_u8(buf.len() as u8)
995                .await
996                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
997            send.write_all(buf.as_slice())
998                .await
999                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1000            for b in body.iter() {
1001                send.write_all(b)
1002                    .await
1003                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1004            }
1005            send.flush()
1006                .await
1007                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1008        }
1009        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1010        Ok(body)
1011    }
1012
1013    async fn send_cmd_by_specify_tunnel(
1014        &self,
1015        peer_id: &PeerId,
1016        tunnel_id: TunnelId,
1017        cmd: CMD,
1018        version: u8,
1019        mut body: CmdBody,
1020    ) -> CmdResult<()> {
1021        let conn = self.peer_manager.find_connection(tunnel_id);
1022        if conn.is_none() {
1023            return Err(cmd_err!(
1024                CmdErrorCode::PeerConnectionNotFound,
1025                "tunnel_id: {:?}",
1026                tunnel_id
1027            ));
1028        }
1029        let conn = conn.unwrap();
1030        assert_eq!(tunnel_id, conn.conn_id);
1031        log::debug!(
1032            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1033            peer_id,
1034            conn.conn_id,
1035            cmd,
1036            body.len()
1037        );
1038        let header = CmdHeader::<LEN, CMD>::new(
1039            version,
1040            false,
1041            None,
1042            cmd,
1043            LEN::from_u64(body.len()).unwrap(),
1044        );
1045        let buf = header
1046            .to_vec()
1047            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1048        let mut send = conn.send.get().await;
1049        if buf.len() > 255 {
1050            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1051        }
1052        send.write_u8(buf.len() as u8)
1053            .await
1054            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1055        send.write_all(buf.as_slice())
1056            .await
1057            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1058        tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1059            .await
1060            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1061        send.flush()
1062            .await
1063            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1064        Ok(())
1065    }
1066
1067    async fn send_cmd_by_specify_tunnel_with_resp(
1068        &self,
1069        peer_id: &PeerId,
1070        tunnel_id: TunnelId,
1071        cmd: CMD,
1072        version: u8,
1073        mut body: CmdBody,
1074        timeout: Duration,
1075    ) -> CmdResult<CmdBody> {
1076        let conn = self.peer_manager.find_connection(tunnel_id);
1077        if conn.is_none() {
1078            return Err(cmd_err!(
1079                CmdErrorCode::PeerConnectionNotFound,
1080                "tunnel_id: {:?}",
1081                tunnel_id
1082            ));
1083        }
1084        let conn = conn.unwrap();
1085        if let Some(id) = tokio::task::try_id() {
1086            if self.state_holder.has_state(id) {
1087                return Err(cmd_err!(
1088                    CmdErrorCode::Failed,
1089                    "can't send msg with resp in tunnel {:?} msg handle",
1090                    conn.conn_id
1091                ));
1092            }
1093        }
1094        assert_eq!(tunnel_id, conn.conn_id);
1095        log::debug!(
1096            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1097            peer_id,
1098            conn.conn_id,
1099            cmd,
1100            body.len()
1101        );
1102        let seq = gen_seq();
1103        let header = CmdHeader::<LEN, CMD>::new(
1104            version,
1105            false,
1106            Some(seq),
1107            cmd,
1108            LEN::from_u64(body.len()).unwrap(),
1109        );
1110        let buf = header
1111            .to_vec()
1112            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1113        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1114        let waiter = self
1115            .resp_waiter
1116            .create_timeout_result_future(resp_id, timeout)
1117            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1118        {
1119            let mut send = conn.send.get().await;
1120            if buf.len() > 255 {
1121                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1122            }
1123            send.write_u8(buf.len() as u8)
1124                .await
1125                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1126            send.write_all(buf.as_slice())
1127                .await
1128                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1129            tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1130                .await
1131                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1132            send.flush()
1133                .await
1134                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1135        }
1136        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1137        Ok(body)
1138    }
1139
1140    async fn send_by_all_tunnels(
1141        &self,
1142        peer_id: &PeerId,
1143        cmd: CMD,
1144        version: u8,
1145        body: &[u8],
1146    ) -> CmdResult<()> {
1147        let connections = self.peer_manager.find_connections(peer_id);
1148        for conn in connections {
1149            let _ret: CmdResult<()> = async move {
1150                let header = CmdHeader::<LEN, CMD>::new(
1151                    version,
1152                    false,
1153                    None,
1154                    cmd,
1155                    LEN::from_u64(body.len() as u64).unwrap(),
1156                );
1157                let buf = header
1158                    .to_vec()
1159                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1160                let mut send = conn.send.get().await;
1161                send.write_u8(buf.len() as u8)
1162                    .await
1163                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1164                send.write_all(buf.as_slice())
1165                    .await
1166                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1167                send.write_all(body)
1168                    .await
1169                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1170                send.flush()
1171                    .await
1172                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1173                Ok(())
1174            }
1175            .await;
1176        }
1177        Ok(())
1178    }
1179
1180    async fn send2_by_all_tunnels(
1181        &self,
1182        peer_id: &PeerId,
1183        cmd: CMD,
1184        version: u8,
1185        body: &[&[u8]],
1186    ) -> CmdResult<()> {
1187        let connections = self.peer_manager.find_connections(peer_id);
1188        let mut len = 0;
1189        for b in body.iter() {
1190            len += b.len();
1191        }
1192        for conn in connections {
1193            let _ret: CmdResult<()> = async move {
1194                let header = CmdHeader::<LEN, CMD>::new(
1195                    version,
1196                    false,
1197                    None,
1198                    cmd,
1199                    LEN::from_u64(len as u64).unwrap(),
1200                );
1201                let buf = header
1202                    .to_vec()
1203                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1204                if buf.len() > 255 {
1205                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1206                }
1207                let mut send = conn.send.get().await;
1208                send.write_u8(buf.len() as u8)
1209                    .await
1210                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1211                send.write_all(buf.as_slice())
1212                    .await
1213                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1214                for b in body.iter() {
1215                    send.write_all(b)
1216                        .await
1217                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1218                }
1219                send.flush()
1220                    .await
1221                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1222                Ok(())
1223            }
1224            .await;
1225        }
1226        Ok(())
1227    }
1228}
1229
1230pub struct DefaultCmdServerIncoming<
1231    M: CmdTunnelMeta,
1232    R: CmdTunnelRead<M>,
1233    W: CmdTunnelWrite<M>,
1234    LISTENER,
1235> {
1236    tunnel_listener: LISTENER,
1237    tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1238    _p: PhantomData<fn() -> (M, R, W)>,
1239}
1240
1241impl<
1242    M: CmdTunnelMeta,
1243    R: CmdTunnelRead<M>,
1244    W: CmdTunnelWrite<M>,
1245    LISTENER: CmdTunnelListener<M, R, W>,
1246> DefaultCmdServerIncoming<M, R, W, LISTENER>
1247{
1248    pub fn new(
1249        tunnel_listener: LISTENER,
1250        tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1251    ) -> Arc<Self> {
1252        Arc::new(Self {
1253            tunnel_listener,
1254            tunnel_service,
1255            _p: PhantomData,
1256        })
1257    }
1258
1259    pub fn start(self: &Arc<Self>) {
1260        let this = self.clone();
1261        tokio::spawn(async move {
1262            if let Err(e) = this.run().await {
1263                log::error!("cmd server error: {:?}", e);
1264            }
1265        });
1266    }
1267
1268    pub async fn run(&self) -> CmdResult<()> {
1269        loop {
1270            let tunnel = self.tunnel_listener.accept().await?;
1271            let tunnel_service = self.tunnel_service.clone();
1272            tokio::spawn(async move {
1273                if let Err(e) = tunnel_service.handle_tunnel(tunnel).await {
1274                    log::error!("peer connection error: {:?}", e);
1275                }
1276            });
1277        }
1278    }
1279}
1280
1281pub struct DefaultCmdServer<
1282    M: CmdTunnelMeta,
1283    R: CmdTunnelRead<M>,
1284    W: CmdTunnelWrite<M>,
1285    LEN,
1286    CMD,
1287    LISTENER,
1288> {
1289    incoming: Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>>,
1290    service: Arc<DefaultCmdServerService<M, R, W, LEN, CMD>>,
1291}
1292
1293impl<
1294    M: CmdTunnelMeta,
1295    R: CmdTunnelRead<M>,
1296    W: CmdTunnelWrite<M>,
1297    LEN: RawEncode
1298        + for<'a> RawDecode<'a>
1299        + Copy
1300        + RawFixedBytes
1301        + Sync
1302        + Send
1303        + 'static
1304        + FromPrimitive
1305        + ToPrimitive,
1306    CMD: RawEncode
1307        + for<'a> RawDecode<'a>
1308        + Copy
1309        + RawFixedBytes
1310        + Sync
1311        + Send
1312        + 'static
1313        + Eq
1314        + Hash
1315        + Debug,
1316    LISTENER: CmdTunnelListener<M, R, W>,
1317> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1318{
1319    pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
1320        let service = DefaultCmdServerService::new();
1321        let incoming = DefaultCmdServerIncoming::new(tunnel_listener, service.clone());
1322        Arc::new(Self { incoming, service })
1323    }
1324
1325    pub fn incoming(&self) -> Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>> {
1326        self.incoming.clone()
1327    }
1328
1329    pub fn service(&self) -> Arc<DefaultCmdServerService<M, R, W, LEN, CMD>> {
1330        self.service.clone()
1331    }
1332
1333    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
1334        self.service.attach_event_listener(event_listener);
1335    }
1336
1337    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
1338        self.service.get_peer_tunnels(peer_id).await
1339    }
1340
1341    pub fn start(self: &Arc<Self>) {
1342        self.incoming.start();
1343    }
1344}
1345
1346impl<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> Deref
1347    for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1348{
1349    type Target = DefaultCmdServerService<M, R, W, LEN, CMD>;
1350
1351    fn deref(&self) -> &Self::Target {
1352        self.service.as_ref()
1353    }
1354}
1355
1356#[async_trait::async_trait]
1357impl<
1358    M: CmdTunnelMeta,
1359    R: CmdTunnelRead<M>,
1360    W: CmdTunnelWrite<M>,
1361    LEN: RawEncode
1362        + for<'a> RawDecode<'a>
1363        + Copy
1364        + RawFixedBytes
1365        + Sync
1366        + Send
1367        + 'static
1368        + FromPrimitive
1369        + ToPrimitive,
1370    CMD: RawEncode
1371        + for<'a> RawDecode<'a>
1372        + Copy
1373        + RawFixedBytes
1374        + Sync
1375        + Send
1376        + 'static
1377        + Eq
1378        + Hash
1379        + Debug,
1380    LISTENER: CmdTunnelListener<M, R, W>,
1381> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1382{
1383    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
1384        self.service.register_cmd_handler(cmd, handler);
1385    }
1386
1387    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
1388        self.service.send(peer_id, cmd, version, body).await
1389    }
1390
1391    async fn send_with_resp(
1392        &self,
1393        peer_id: &PeerId,
1394        cmd: CMD,
1395        version: u8,
1396        body: &[u8],
1397        timeout: Duration,
1398    ) -> CmdResult<CmdBody> {
1399        self.service
1400            .send_with_resp(peer_id, cmd, version, body, timeout)
1401            .await
1402    }
1403
1404    async fn send2(
1405        &self,
1406        peer_id: &PeerId,
1407        cmd: CMD,
1408        version: u8,
1409        body: &[&[u8]],
1410    ) -> CmdResult<()> {
1411        self.service.send2(peer_id, cmd, version, body).await
1412    }
1413
1414    async fn send2_with_resp(
1415        &self,
1416        peer_id: &PeerId,
1417        cmd: CMD,
1418        version: u8,
1419        body: &[&[u8]],
1420        timeout: Duration,
1421    ) -> CmdResult<CmdBody> {
1422        self.service
1423            .send2_with_resp(peer_id, cmd, version, body, timeout)
1424            .await
1425    }
1426
1427    async fn send_cmd(
1428        &self,
1429        peer_id: &PeerId,
1430        cmd: CMD,
1431        version: u8,
1432        body: CmdBody,
1433    ) -> CmdResult<()> {
1434        self.service.send_cmd(peer_id, cmd, version, body).await
1435    }
1436
1437    async fn send_cmd_with_resp(
1438        &self,
1439        peer_id: &PeerId,
1440        cmd: CMD,
1441        version: u8,
1442        body: CmdBody,
1443        timeout: Duration,
1444    ) -> CmdResult<CmdBody> {
1445        self.service
1446            .send_cmd_with_resp(peer_id, cmd, version, body, timeout)
1447            .await
1448    }
1449
1450    async fn send_by_specify_tunnel(
1451        &self,
1452        peer_id: &PeerId,
1453        tunnel_id: TunnelId,
1454        cmd: CMD,
1455        version: u8,
1456        body: &[u8],
1457    ) -> CmdResult<()> {
1458        self.service
1459            .send_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1460            .await
1461    }
1462
1463    async fn send_by_specify_tunnel_with_resp(
1464        &self,
1465        peer_id: &PeerId,
1466        tunnel_id: TunnelId,
1467        cmd: CMD,
1468        version: u8,
1469        body: &[u8],
1470        timeout: Duration,
1471    ) -> CmdResult<CmdBody> {
1472        self.service
1473            .send_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1474            .await
1475    }
1476
1477    async fn send2_by_specify_tunnel(
1478        &self,
1479        peer_id: &PeerId,
1480        tunnel_id: TunnelId,
1481        cmd: CMD,
1482        version: u8,
1483        body: &[&[u8]],
1484    ) -> CmdResult<()> {
1485        self.service
1486            .send2_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1487            .await
1488    }
1489
1490    async fn send2_by_specify_tunnel_with_resp(
1491        &self,
1492        peer_id: &PeerId,
1493        tunnel_id: TunnelId,
1494        cmd: CMD,
1495        version: u8,
1496        body: &[&[u8]],
1497        timeout: Duration,
1498    ) -> CmdResult<CmdBody> {
1499        self.service
1500            .send2_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1501            .await
1502    }
1503
1504    async fn send_cmd_by_specify_tunnel(
1505        &self,
1506        peer_id: &PeerId,
1507        tunnel_id: TunnelId,
1508        cmd: CMD,
1509        version: u8,
1510        body: CmdBody,
1511    ) -> CmdResult<()> {
1512        self.service
1513            .send_cmd_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1514            .await
1515    }
1516
1517    async fn send_cmd_by_specify_tunnel_with_resp(
1518        &self,
1519        peer_id: &PeerId,
1520        tunnel_id: TunnelId,
1521        cmd: CMD,
1522        version: u8,
1523        body: CmdBody,
1524        timeout: Duration,
1525    ) -> CmdResult<CmdBody> {
1526        self.service
1527            .send_cmd_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1528            .await
1529    }
1530
1531    async fn send_by_all_tunnels(
1532        &self,
1533        peer_id: &PeerId,
1534        cmd: CMD,
1535        version: u8,
1536        body: &[u8],
1537    ) -> CmdResult<()> {
1538        self.service
1539            .send_by_all_tunnels(peer_id, cmd, version, body)
1540            .await
1541    }
1542
1543    async fn send2_by_all_tunnels(
1544        &self,
1545        peer_id: &PeerId,
1546        cmd: CMD,
1547        version: u8,
1548        body: &[&[u8]],
1549    ) -> CmdResult<()> {
1550        self.service
1551            .send2_by_all_tunnels(peer_id, cmd, version, body)
1552            .await
1553    }
1554}