Skip to main content

sfo_cmd_server/server/
server.rs

1use super::CmdServer;
2use super::peer_manager::{PeerManager, PeerManagerRef};
3use crate::client::{RespWaiter, RespWaiterRef, gen_resp_id, gen_seq};
4use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
5use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
6use crate::peer_connection::PeerConnection;
7use crate::peer_id::PeerId;
8use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
9use async_named_locker::{NamedStateHolder, ObjectHolder};
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_split::Splittable;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::marker::PhantomData;
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
20
21#[async_trait::async_trait]
22pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
23    Send + Sync + 'static
24{
25    async fn accept(&self) -> CmdResult<Splittable<R, W>>;
26}
27
28#[async_trait::async_trait]
29pub trait CmdTunnelService<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
30    Send + Sync + 'static
31{
32    async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()>;
33}
34
35#[async_trait::async_trait]
36pub trait CmdServerEventListener: Send + Sync + 'static {
37    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
38    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
39}
40
41#[derive(Clone)]
42struct CmdServerEventListenerEmit {
43    listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
44}
45
46impl CmdServerEventListenerEmit {
47    pub fn new() -> Self {
48        Self {
49            listeners: Arc::new(Mutex::new(Vec::new())),
50        }
51    }
52
53    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
54        self.listeners.lock().unwrap().push(event_listener);
55    }
56}
57
58#[async_trait::async_trait]
59impl CmdServerEventListener for CmdServerEventListenerEmit {
60    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
61        let listeners = { self.listeners.lock().unwrap().clone() };
62        for listener in listeners.iter() {
63            if let Err(e) = listener.on_peer_connected(peer_id).await {
64                log::error!("on_peer_connected error: {:?}", e);
65            }
66        }
67        Ok(())
68    }
69
70    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
71        let listeners = { self.listeners.lock().unwrap().clone() };
72        for listener in listeners.iter() {
73            if let Err(e) = listener.on_peer_disconnected(peer_id).await {
74                log::error!("on_peer_disconnected error: {:?}", e);
75            }
76        }
77        Ok(())
78    }
79}
80
81pub struct DefaultCmdServerService<
82    M: CmdTunnelMeta,
83    R: CmdTunnelRead<M>,
84    W: CmdTunnelWrite<M>,
85    LEN,
86    CMD,
87> {
88    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
89    peer_manager: PeerManagerRef<M, R, W>,
90    event_emit: CmdServerEventListenerEmit,
91    resp_waiter: RespWaiterRef,
92    state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
93    _p: PhantomData<fn() -> (M, R, W, LEN, CMD)>,
94}
95
96impl<
97    M: CmdTunnelMeta,
98    R: CmdTunnelRead<M>,
99    W: CmdTunnelWrite<M>,
100    LEN: RawEncode
101        + for<'a> RawDecode<'a>
102        + Copy
103        + RawFixedBytes
104        + Sync
105        + Send
106        + 'static
107        + FromPrimitive
108        + ToPrimitive,
109    CMD: RawEncode
110        + for<'a> RawDecode<'a>
111        + Copy
112        + RawFixedBytes
113        + Sync
114        + Send
115        + 'static
116        + Eq
117        + Hash
118        + Debug,
119> DefaultCmdServerService<M, R, W, LEN, CMD>
120{
121    fn encode_pkg_len(len: u64) -> CmdResult<LEN> {
122        LEN::from_u64(len).ok_or_else(|| {
123            cmd_err!(
124                CmdErrorCode::InvalidParam,
125                "body len {} exceeds header len type {}",
126                len,
127                std::any::type_name::<LEN>()
128            )
129        })
130    }
131
132    pub fn new() -> Arc<Self> {
133        let event_emit = CmdServerEventListenerEmit::new();
134        Arc::new(Self {
135            cmd_handler_map: Arc::new(CmdHandlerMap::new()),
136            peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
137            event_emit,
138            resp_waiter: Arc::new(RespWaiter::new()),
139            state_holder: NamedStateHolder::new(),
140            _p: PhantomData,
141        })
142    }
143
144    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
145        self.event_emit.attach_event_listener(event_listener);
146    }
147
148    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
149        self.peer_manager.find_connections(peer_id)
150    }
151
152    pub async fn serve_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
153        let peer_id = tunnel.get_remote_peer_id();
154        let tunnel_id = self.peer_manager.generate_conn_id();
155        let cmd_handler_map = self.cmd_handler_map.clone();
156        let resp_waiter = self.resp_waiter.clone();
157        let state_holder = self.state_holder.clone();
158        let (mut reader, writer) = tunnel.split();
159        let local_id = reader.get_local_peer_id();
160        let writer = ObjectHolder::new(writer);
161        let resp_write = writer.clone();
162        let remote_id = peer_id.clone();
163        let recv_handle = tokio::spawn(async move {
164            let ret: CmdResult<()> = async move {
165                loop {
166                    let header_len = reader
167                        .read_u8()
168                        .await
169                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
170                    let mut header = vec![0u8; header_len as usize];
171                    let n = reader
172                        .read_exact(&mut header)
173                        .await
174                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
175                    if n == 0 {
176                        break;
177                    }
178                    let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
179                        .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
180                    sfo_log::debug!("recv cmd {:?}", header.cmd_code());
181                    let body_len = header.pkg_len().to_u64().unwrap();
182                    let cmd_read = CmdBodyRead::new(reader, body_len as usize);
183                    let waiter = cmd_read.get_waiter();
184                    let future = waiter
185                        .create_result_future()
186                        .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
187                    {
188                        let body = CmdBody::from_reader(BufReader::new(cmd_read), body_len);
189                        if header.is_resp() && header.seq().is_some() {
190                            let resp_id =
191                                gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
192                            let _ = resp_waiter.set_result(resp_id, body);
193                        } else if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
194                            let version = header.version();
195                            let seq = header.seq();
196                            let cmd_code = header.cmd_code();
197                            match {
198                                let _handle_state = state_holder.new_state(tokio::task::id());
199                                handler
200                                    .handle(
201                                        local_id.clone(),
202                                        remote_id.clone(),
203                                        tunnel_id,
204                                        header,
205                                        body,
206                                    )
207                                    .await
208                            } {
209                                Ok(Some(mut body)) => {
210                                    let mut write = resp_write.get().await;
211                                    let header = CmdHeader::<LEN, CMD>::new(
212                                        version,
213                                        true,
214                                        seq,
215                                        cmd_code,
216                                        Self::encode_pkg_len(body.len())?,
217                                    );
218                                    let buf = header
219                                        .to_vec()
220                                        .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
221                                    if buf.len() > 255 {
222                                        return Err(cmd_err!(
223                                            CmdErrorCode::RawCodecError,
224                                            "header len too large"
225                                        ));
226                                    }
227                                    write
228                                        .write_u8(buf.len() as u8)
229                                        .await
230                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
231                                    write
232                                        .write_all(buf.as_slice())
233                                        .await
234                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
235                                    tokio::io::copy(&mut body, write.deref_mut().deref_mut())
236                                        .await
237                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
238                                    write
239                                        .flush()
240                                        .await
241                                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
242                                }
243                                Err(e) => {
244                                    log::error!("handle cmd error: {:?}", e);
245                                }
246                                Ok(None) => {}
247                            }
248                        }
249                    }
250                    reader = future
251                        .await
252                        .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
253                }
254                Ok(())
255            }
256            .await;
257            if let Err(e) = ret.as_ref() {
258                log::error!("recv cmd error: {:?}", e);
259            }
260            ret
261        });
262
263        let peer_conn = PeerConnection {
264            conn_id: tunnel_id,
265            peer_id,
266            send: writer,
267            handle: Some(recv_handle),
268        };
269        self.peer_manager.add_peer_connection(peer_conn).await;
270        Ok(())
271    }
272}
273
274#[async_trait::async_trait]
275impl<
276    M: CmdTunnelMeta,
277    R: CmdTunnelRead<M>,
278    W: CmdTunnelWrite<M>,
279    LEN: RawEncode
280        + for<'a> RawDecode<'a>
281        + Copy
282        + RawFixedBytes
283        + Sync
284        + Send
285        + 'static
286        + FromPrimitive
287        + ToPrimitive,
288    CMD: RawEncode
289        + for<'a> RawDecode<'a>
290        + Copy
291        + RawFixedBytes
292        + Sync
293        + Send
294        + 'static
295        + Eq
296        + Hash
297        + Debug,
298> CmdTunnelService<M, R, W> for DefaultCmdServerService<M, R, W, LEN, CMD>
299{
300    async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
301        self.serve_tunnel(tunnel).await
302    }
303}
304
305#[async_trait::async_trait]
306impl<
307    M: CmdTunnelMeta,
308    R: CmdTunnelRead<M>,
309    W: CmdTunnelWrite<M>,
310    LEN: RawEncode
311        + for<'a> RawDecode<'a>
312        + Copy
313        + RawFixedBytes
314        + Sync
315        + Send
316        + 'static
317        + FromPrimitive
318        + ToPrimitive,
319    CMD: RawEncode
320        + for<'a> RawDecode<'a>
321        + Copy
322        + RawFixedBytes
323        + Sync
324        + Send
325        + 'static
326        + Eq
327        + Hash
328        + Debug,
329> CmdServer<LEN, CMD> for DefaultCmdServerService<M, R, W, LEN, CMD>
330{
331    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
332        self.cmd_handler_map.insert(cmd, handler);
333    }
334
335    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
336        let connections = self.peer_manager.find_connections(peer_id);
337        if connections.is_empty() {
338            return Err(cmd_err!(
339                CmdErrorCode::PeerConnectionNotFound,
340                "peer_id: {}",
341                peer_id
342            ));
343        }
344        let mut last_err = None;
345        for conn in connections {
346            let ret: CmdResult<()> = async move {
347                log::debug!(
348                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
349                    peer_id,
350                    conn.conn_id,
351                    cmd,
352                    body.len(),
353                    hex::encode(body)
354                );
355                let header = CmdHeader::<LEN, CMD>::new(
356                    version,
357                    false,
358                    None,
359                    cmd,
360                    Self::encode_pkg_len(body.len() as u64)?,
361                );
362                let buf = header
363                    .to_vec()
364                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
365                let mut send = conn.send.get().await;
366                if buf.len() > 255 {
367                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
368                }
369                send.write_u8(buf.len() as u8)
370                    .await
371                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
372                send.write_all(buf.as_slice())
373                    .await
374                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
375                send.write_all(body)
376                    .await
377                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
378                send.flush()
379                    .await
380                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
381                Ok(())
382            }
383            .await;
384            if ret.is_ok() {
385                return Ok(());
386            }
387            last_err = ret.err();
388        }
389        Err(last_err
390            .unwrap_or_else(|| cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id)))
391    }
392
393    async fn send_with_resp(
394        &self,
395        peer_id: &PeerId,
396        cmd: CMD,
397        version: u8,
398        body: &[u8],
399        timeout: Duration,
400    ) -> CmdResult<CmdBody> {
401        let connections = self.peer_manager.find_connections(peer_id);
402        for conn in connections {
403            if let Some(id) = tokio::task::try_id() {
404                if self.state_holder.has_state(id) {
405                    continue;
406                }
407            }
408            let ret: CmdResult<CmdBody> = async move {
409                log::debug!(
410                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
411                    peer_id,
412                    conn.conn_id,
413                    cmd,
414                    body.len(),
415                    hex::encode(body)
416                );
417                let seq = gen_seq();
418                let header = CmdHeader::<LEN, CMD>::new(
419                    version,
420                    false,
421                    Some(seq),
422                    cmd,
423                    Self::encode_pkg_len(body.len() as u64)?,
424                );
425                let buf = header
426                    .to_vec()
427                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
428                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
429                let waiter = self
430                    .resp_waiter
431                    .create_timeout_result_future(resp_id, timeout)
432                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
433                {
434                    let mut send = conn.send.get().await;
435                    if buf.len() > 255 {
436                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
437                    }
438                    send.write_u8(buf.len() as u8)
439                        .await
440                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
441                    send.write_all(buf.as_slice())
442                        .await
443                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
444                    send.write_all(body)
445                        .await
446                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
447                    send.flush()
448                        .await
449                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
450                }
451                let body =
452                    waiter
453                        .await
454                        .map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
455                Ok(body)
456            }
457            .await;
458            if ret.is_ok() {
459                return ret;
460            } else {
461                sfo_log::error!("send err {:?}", ret.unwrap_err());
462            }
463        }
464        Err(cmd_err!(
465            CmdErrorCode::Failed,
466            "send to peer_id: {}",
467            peer_id
468        ))
469    }
470
471    async fn send_parts(
472        &self,
473        peer_id: &PeerId,
474        cmd: CMD,
475        version: u8,
476        body: &[&[u8]],
477    ) -> CmdResult<()> {
478        let connections = self.peer_manager.find_connections(peer_id);
479        if connections.is_empty() {
480            return Err(cmd_err!(
481                CmdErrorCode::PeerConnectionNotFound,
482                "peer_id: {}",
483                peer_id
484            ));
485        }
486        let mut last_err = None;
487        for conn in connections {
488            let ret: CmdResult<()> = async move {
489                let mut len = 0;
490                for b in body.iter() {
491                    len += b.len();
492                    log::debug!(
493                        "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
494                        peer_id,
495                        conn.conn_id,
496                        cmd,
497                        hex::encode(b)
498                    );
499                }
500                log::debug!(
501                    "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
502                    peer_id,
503                    conn.conn_id,
504                    cmd,
505                    len
506                );
507                let header = CmdHeader::<LEN, CMD>::new(
508                    version,
509                    false,
510                    None,
511                    cmd,
512                    Self::encode_pkg_len(len as u64)?,
513                );
514                let buf = header
515                    .to_vec()
516                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
517                let mut send = conn.send.get().await;
518                if buf.len() > 255 {
519                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
520                }
521                send.write_u8(buf.len() as u8)
522                    .await
523                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
524                send.write_all(buf.as_slice())
525                    .await
526                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
527                for b in body.iter() {
528                    send.write_all(b)
529                        .await
530                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
531                }
532                send.flush()
533                    .await
534                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
535                Ok(())
536            }
537            .await;
538            if ret.is_ok() {
539                return Ok(());
540            }
541            last_err = ret.err();
542        }
543        Err(last_err
544            .unwrap_or_else(|| cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id)))
545    }
546
547    async fn send_parts_with_resp(
548        &self,
549        peer_id: &PeerId,
550        cmd: CMD,
551        version: u8,
552        body: &[&[u8]],
553        timeout: Duration,
554    ) -> CmdResult<CmdBody> {
555        let connections = self.peer_manager.find_connections(peer_id);
556        for conn in connections {
557            if let Some(id) = tokio::task::try_id() {
558                if self.state_holder.has_state(id) {
559                    continue;
560                }
561            }
562            let ret: CmdResult<CmdBody> = async move {
563                let mut len = 0;
564                for b in body.iter() {
565                    len += b.len();
566                    log::debug!(
567                        "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
568                        peer_id,
569                        conn.conn_id,
570                        cmd,
571                        hex::encode(b)
572                    );
573                }
574                log::debug!(
575                    "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
576                    peer_id,
577                    conn.conn_id,
578                    cmd,
579                    len
580                );
581                let seq = gen_seq();
582                let header = CmdHeader::<LEN, CMD>::new(
583                    version,
584                    false,
585                    Some(seq),
586                    cmd,
587                    Self::encode_pkg_len(len as u64)?,
588                );
589                let buf = header
590                    .to_vec()
591                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
592                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
593                let waiter = self
594                    .resp_waiter
595                    .create_timeout_result_future(resp_id, timeout)
596                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
597                {
598                    let mut send = conn.send.get().await;
599                    if buf.len() > 255 {
600                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
601                    }
602                    send.write_u8(buf.len() as u8)
603                        .await
604                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
605                    send.write_all(buf.as_slice())
606                        .await
607                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
608                    for b in body.iter() {
609                        send.write_all(b)
610                            .await
611                            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
612                    }
613                    send.flush()
614                        .await
615                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
616                }
617                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
618                Ok(body)
619            }
620            .await;
621            if ret.is_ok() {
622                return ret;
623            }
624        }
625        Err(cmd_err!(
626            CmdErrorCode::Failed,
627            "send to peer_id: {}",
628            peer_id
629        ))
630    }
631
632    async fn send_cmd(
633        &self,
634        peer_id: &PeerId,
635        cmd: CMD,
636        version: u8,
637        mut body: CmdBody,
638    ) -> CmdResult<()> {
639        let connections = self.peer_manager.find_connections(peer_id);
640        if connections.is_empty() {
641            return Err(cmd_err!(
642                CmdErrorCode::PeerConnectionNotFound,
643                "peer_id: {}",
644                peer_id
645            ));
646        }
647        let mut last_err = None;
648        for conn in connections {
649            let ret: CmdResult<()> = async {
650                log::debug!(
651                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
652                    peer_id,
653                    conn.conn_id,
654                    cmd,
655                    body.len(),
656                    "<streaming>"
657                );
658                let header = CmdHeader::<LEN, CMD>::new(
659                    version,
660                    false,
661                    None,
662                    cmd,
663                    Self::encode_pkg_len(body.len() as u64)?,
664                );
665                let buf = header
666                    .to_vec()
667                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
668                let mut send = conn.send.get().await;
669                if buf.len() > 255 {
670                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
671                }
672                send.write_u8(buf.len() as u8)
673                    .await
674                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
675                send.write_all(buf.as_slice())
676                    .await
677                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
678                tokio::io::copy(&mut body, send.deref_mut().deref_mut())
679                    .await
680                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
681                send.flush()
682                    .await
683                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
684                Ok(())
685            }
686            .await;
687            if ret.is_ok() {
688                return Ok(());
689            }
690            last_err = ret.err();
691        }
692        Err(last_err
693            .unwrap_or_else(|| cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id)))
694    }
695
696    async fn send_cmd_with_resp(
697        &self,
698        peer_id: &PeerId,
699        cmd: CMD,
700        version: u8,
701        mut body: CmdBody,
702        timeout: Duration,
703    ) -> CmdResult<CmdBody> {
704        let connections = self.peer_manager.find_connections(peer_id);
705        if connections.is_empty() {
706            return Err(cmd_err!(
707                CmdErrorCode::PeerConnectionNotFound,
708                "peer_id: {}",
709                peer_id
710            ));
711        }
712        let mut last_err = None;
713        for conn in connections {
714            if let Some(id) = tokio::task::try_id() {
715                if self.state_holder.has_state(id) {
716                    continue;
717                }
718            }
719            let ret: CmdResult<CmdBody> = async {
720                log::debug!(
721                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}",
722                    peer_id,
723                    conn.conn_id,
724                    cmd,
725                    body.len()
726                );
727                let seq = gen_seq();
728                let header = CmdHeader::<LEN, CMD>::new(
729                    version,
730                    false,
731                    Some(seq),
732                    cmd,
733                    Self::encode_pkg_len(body.len())?,
734                );
735                let buf = header
736                    .to_vec()
737                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
738                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
739                let waiter = self
740                    .resp_waiter
741                    .create_timeout_result_future(resp_id, timeout)
742                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
743                {
744                    let mut send = conn.send.get().await;
745                    if buf.len() > 255 {
746                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
747                    }
748                    send.write_u8(buf.len() as u8)
749                        .await
750                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
751                    send.write_all(buf.as_slice())
752                        .await
753                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
754                    tokio::io::copy(&mut body, send.deref_mut().deref_mut())
755                        .await
756                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
757                    send.flush()
758                        .await
759                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
760                }
761                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
762                Ok(body)
763            }
764            .await;
765            if ret.is_ok() {
766                return ret;
767            }
768            last_err = ret.err();
769        }
770        Err(last_err
771            .unwrap_or_else(|| cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id)))
772    }
773
774    async fn send_by_specify_tunnel(
775        &self,
776        peer_id: &PeerId,
777        tunnel_id: TunnelId,
778        cmd: CMD,
779        version: u8,
780        body: &[u8],
781    ) -> CmdResult<()> {
782        let conn = self.peer_manager.find_connection(tunnel_id);
783        if conn.is_none() {
784            return Err(cmd_err!(
785                CmdErrorCode::PeerConnectionNotFound,
786                "tunnel_id: {:?}",
787                tunnel_id
788            ));
789        }
790        let conn = conn.unwrap();
791        assert_eq!(tunnel_id, conn.conn_id);
792        log::trace!(
793            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
794            peer_id,
795            conn.conn_id,
796            cmd,
797            body.len(),
798            hex::encode(body)
799        );
800        let header = CmdHeader::<LEN, CMD>::new(
801            version,
802            false,
803            None,
804            cmd,
805            Self::encode_pkg_len(body.len() as u64)?,
806        );
807        let buf = header
808            .to_vec()
809            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
810        let mut send = conn.send.get().await;
811        if buf.len() > 255 {
812            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
813        }
814        send.write_u8(buf.len() as u8)
815            .await
816            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
817        send.write_all(buf.as_slice())
818            .await
819            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
820        send.write_all(body)
821            .await
822            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
823        send.flush()
824            .await
825            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
826        Ok(())
827    }
828
829    async fn send_by_specify_tunnel_with_resp(
830        &self,
831        peer_id: &PeerId,
832        tunnel_id: TunnelId,
833        cmd: CMD,
834        version: u8,
835        body: &[u8],
836        timeout: Duration,
837    ) -> CmdResult<CmdBody> {
838        let conn = self.peer_manager.find_connection(tunnel_id);
839        if conn.is_none() {
840            return Err(cmd_err!(
841                CmdErrorCode::PeerConnectionNotFound,
842                "tunnel_id: {:?}",
843                tunnel_id
844            ));
845        }
846        let conn = conn.unwrap();
847        if let Some(id) = tokio::task::try_id() {
848            if self.state_holder.has_state(id) {
849                return Err(cmd_err!(
850                    CmdErrorCode::Failed,
851                    "can't send msg with resp in tunnel {:?} msg handle",
852                    conn.conn_id
853                ));
854            }
855        }
856        assert_eq!(tunnel_id, conn.conn_id);
857        log::trace!(
858            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
859            peer_id,
860            conn.conn_id,
861            cmd,
862            body.len(),
863            hex::encode(body)
864        );
865        let seq = gen_seq();
866        let header = CmdHeader::<LEN, CMD>::new(
867            version,
868            false,
869            Some(seq),
870            cmd,
871            Self::encode_pkg_len(body.len() as u64)?,
872        );
873        let buf = header
874            .to_vec()
875            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
876        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
877        let waiter = self
878            .resp_waiter
879            .create_timeout_result_future(resp_id, timeout)
880            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
881        {
882            let mut send = conn.send.get().await;
883            if buf.len() > 255 {
884                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
885            }
886            send.write_u8(buf.len() as u8)
887                .await
888                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
889            send.write_all(buf.as_slice())
890                .await
891                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
892            send.write_all(body)
893                .await
894                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
895            send.flush()
896                .await
897                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
898        }
899        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
900        Ok(body)
901    }
902
903    async fn send_parts_by_specify_tunnel(
904        &self,
905        peer_id: &PeerId,
906        tunnel_id: TunnelId,
907        cmd: CMD,
908        version: u8,
909        body: &[&[u8]],
910    ) -> CmdResult<()> {
911        let conn = self.peer_manager.find_connection(tunnel_id);
912        if conn.is_none() {
913            return Err(cmd_err!(
914                CmdErrorCode::PeerConnectionNotFound,
915                "tunnel_id: {:?}",
916                tunnel_id
917            ));
918        }
919        let conn = conn.unwrap();
920        assert_eq!(tunnel_id, conn.conn_id);
921        let mut len = 0;
922        for b in body.iter() {
923            len += b.len();
924            log::debug!(
925                "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
926                peer_id,
927                conn.conn_id,
928                cmd,
929                hex::encode(b)
930            );
931        }
932        log::debug!(
933            "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
934            peer_id,
935            conn.conn_id,
936            cmd,
937            len
938        );
939        let header = CmdHeader::<LEN, CMD>::new(
940            version,
941            false,
942            None,
943            cmd,
944            Self::encode_pkg_len(len as u64)?,
945        );
946        let buf = header
947            .to_vec()
948            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
949        if buf.len() > 255 {
950            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
951        }
952        let mut send = conn.send.get().await;
953        send.write_u8(buf.len() as u8)
954            .await
955            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
956        send.write_all(buf.as_slice())
957            .await
958            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
959        for b in body.iter() {
960            send.write_all(b)
961                .await
962                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
963        }
964        send.flush()
965            .await
966            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
967        Ok(())
968    }
969
970    async fn send_parts_by_specify_tunnel_with_resp(
971        &self,
972        peer_id: &PeerId,
973        tunnel_id: TunnelId,
974        cmd: CMD,
975        version: u8,
976        body: &[&[u8]],
977        timeout: Duration,
978    ) -> CmdResult<CmdBody> {
979        let conn = self.peer_manager.find_connection(tunnel_id);
980        if conn.is_none() {
981            return Err(cmd_err!(
982                CmdErrorCode::PeerConnectionNotFound,
983                "tunnel_id: {:?}",
984                tunnel_id
985            ));
986        }
987        let conn = conn.unwrap();
988        if let Some(id) = tokio::task::try_id() {
989            if self.state_holder.has_state(id) {
990                return Err(cmd_err!(
991                    CmdErrorCode::Failed,
992                    "can't send msg with resp in tunnel {:?} msg handle",
993                    conn.conn_id
994                ));
995            }
996        }
997        assert_eq!(tunnel_id, conn.conn_id);
998        let mut len = 0;
999        for b in body.iter() {
1000            len += b.len();
1001            log::debug!(
1002                "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
1003                peer_id,
1004                conn.conn_id,
1005                cmd,
1006                hex::encode(b)
1007            );
1008        }
1009        log::debug!(
1010            "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
1011            peer_id,
1012            conn.conn_id,
1013            cmd,
1014            len
1015        );
1016        let seq = gen_seq();
1017        let header = CmdHeader::<LEN, CMD>::new(
1018            version,
1019            false,
1020            Some(seq),
1021            cmd,
1022            Self::encode_pkg_len(len as u64)?,
1023        );
1024        let buf = header
1025            .to_vec()
1026            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1027        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1028        let waiter = self
1029            .resp_waiter
1030            .create_timeout_result_future(resp_id, timeout)
1031            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1032        if buf.len() > 255 {
1033            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1034        }
1035        {
1036            let mut send = conn.send.get().await;
1037            send.write_u8(buf.len() as u8)
1038                .await
1039                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1040            send.write_all(buf.as_slice())
1041                .await
1042                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1043            for b in body.iter() {
1044                send.write_all(b)
1045                    .await
1046                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1047            }
1048            send.flush()
1049                .await
1050                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1051        }
1052        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1053        Ok(body)
1054    }
1055
1056    async fn send_cmd_by_specify_tunnel(
1057        &self,
1058        peer_id: &PeerId,
1059        tunnel_id: TunnelId,
1060        cmd: CMD,
1061        version: u8,
1062        mut body: CmdBody,
1063    ) -> CmdResult<()> {
1064        let conn = self.peer_manager.find_connection(tunnel_id);
1065        if conn.is_none() {
1066            return Err(cmd_err!(
1067                CmdErrorCode::PeerConnectionNotFound,
1068                "tunnel_id: {:?}",
1069                tunnel_id
1070            ));
1071        }
1072        let conn = conn.unwrap();
1073        assert_eq!(tunnel_id, conn.conn_id);
1074        log::debug!(
1075            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1076            peer_id,
1077            conn.conn_id,
1078            cmd,
1079            body.len()
1080        );
1081        let header = CmdHeader::<LEN, CMD>::new(
1082            version,
1083            false,
1084            None,
1085            cmd,
1086            Self::encode_pkg_len(body.len())?,
1087        );
1088        let buf = header
1089            .to_vec()
1090            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1091        let mut send = conn.send.get().await;
1092        if buf.len() > 255 {
1093            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1094        }
1095        send.write_u8(buf.len() as u8)
1096            .await
1097            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1098        send.write_all(buf.as_slice())
1099            .await
1100            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1101        tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1102            .await
1103            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1104        send.flush()
1105            .await
1106            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1107        Ok(())
1108    }
1109
1110    async fn send_cmd_by_specify_tunnel_with_resp(
1111        &self,
1112        peer_id: &PeerId,
1113        tunnel_id: TunnelId,
1114        cmd: CMD,
1115        version: u8,
1116        mut body: CmdBody,
1117        timeout: Duration,
1118    ) -> CmdResult<CmdBody> {
1119        let conn = self.peer_manager.find_connection(tunnel_id);
1120        if conn.is_none() {
1121            return Err(cmd_err!(
1122                CmdErrorCode::PeerConnectionNotFound,
1123                "tunnel_id: {:?}",
1124                tunnel_id
1125            ));
1126        }
1127        let conn = conn.unwrap();
1128        if let Some(id) = tokio::task::try_id() {
1129            if self.state_holder.has_state(id) {
1130                return Err(cmd_err!(
1131                    CmdErrorCode::Failed,
1132                    "can't send msg with resp in tunnel {:?} msg handle",
1133                    conn.conn_id
1134                ));
1135            }
1136        }
1137        assert_eq!(tunnel_id, conn.conn_id);
1138        log::debug!(
1139            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1140            peer_id,
1141            conn.conn_id,
1142            cmd,
1143            body.len()
1144        );
1145        let seq = gen_seq();
1146        let header = CmdHeader::<LEN, CMD>::new(
1147            version,
1148            false,
1149            Some(seq),
1150            cmd,
1151            Self::encode_pkg_len(body.len())?,
1152        );
1153        let buf = header
1154            .to_vec()
1155            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1156        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1157        let waiter = self
1158            .resp_waiter
1159            .create_timeout_result_future(resp_id, timeout)
1160            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1161        {
1162            let mut send = conn.send.get().await;
1163            if buf.len() > 255 {
1164                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1165            }
1166            send.write_u8(buf.len() as u8)
1167                .await
1168                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1169            send.write_all(buf.as_slice())
1170                .await
1171                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1172            tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1173                .await
1174                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1175            send.flush()
1176                .await
1177                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1178        }
1179        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1180        Ok(body)
1181    }
1182
1183    async fn send_by_all_tunnels(
1184        &self,
1185        peer_id: &PeerId,
1186        cmd: CMD,
1187        version: u8,
1188        body: &[u8],
1189    ) -> CmdResult<()> {
1190        let connections = self.peer_manager.find_connections(peer_id);
1191        let header = CmdHeader::<LEN, CMD>::new(
1192            version,
1193            false,
1194            None,
1195            cmd,
1196            Self::encode_pkg_len(body.len() as u64)?,
1197        );
1198        let buf = header
1199            .to_vec()
1200            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1201        if buf.len() > 255 {
1202            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1203        }
1204        for conn in connections {
1205            let ret: CmdResult<()> = async {
1206                let mut send = conn.send.get().await;
1207                send.write_u8(buf.len() as u8)
1208                    .await
1209                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1210                send.write_all(buf.as_slice())
1211                    .await
1212                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1213                send.write_all(body)
1214                    .await
1215                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1216                send.flush()
1217                    .await
1218                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1219                Ok(())
1220            }
1221            .await;
1222            if let Err(e) = ret {
1223                log::error!(
1224                    "broadcast send failed peer_id: {}, tunnel_id: {:?}, cmd: {:?}, err: {:?}",
1225                    peer_id,
1226                    conn.conn_id,
1227                    cmd,
1228                    e
1229                );
1230            }
1231        }
1232        Ok(())
1233    }
1234
1235    async fn send_parts_by_all_tunnels(
1236        &self,
1237        peer_id: &PeerId,
1238        cmd: CMD,
1239        version: u8,
1240        body: &[&[u8]],
1241    ) -> CmdResult<()> {
1242        let connections = self.peer_manager.find_connections(peer_id);
1243        let mut len = 0;
1244        for b in body.iter() {
1245            len += b.len();
1246        }
1247        let header = CmdHeader::<LEN, CMD>::new(
1248            version,
1249            false,
1250            None,
1251            cmd,
1252            Self::encode_pkg_len(len as u64)?,
1253        );
1254        let buf = header
1255            .to_vec()
1256            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1257        if buf.len() > 255 {
1258            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1259        }
1260        for conn in connections {
1261            let ret: CmdResult<()> = async {
1262                let mut send = conn.send.get().await;
1263                send.write_u8(buf.len() as u8)
1264                    .await
1265                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1266                send.write_all(buf.as_slice())
1267                    .await
1268                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1269                for b in body.iter() {
1270                    send.write_all(b)
1271                        .await
1272                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1273                }
1274                send.flush()
1275                    .await
1276                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1277                Ok(())
1278            }
1279            .await;
1280            if let Err(e) = ret {
1281                log::error!(
1282                    "broadcast send2 failed peer_id: {}, tunnel_id: {:?}, cmd: {:?}, err: {:?}",
1283                    peer_id,
1284                    conn.conn_id,
1285                    cmd,
1286                    e
1287                );
1288            }
1289        }
1290        Ok(())
1291    }
1292}
1293
1294pub struct DefaultCmdServerIncoming<
1295    M: CmdTunnelMeta,
1296    R: CmdTunnelRead<M>,
1297    W: CmdTunnelWrite<M>,
1298    LISTENER,
1299> {
1300    tunnel_listener: LISTENER,
1301    tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1302    _p: PhantomData<fn() -> (M, R, W)>,
1303}
1304
1305impl<
1306    M: CmdTunnelMeta,
1307    R: CmdTunnelRead<M>,
1308    W: CmdTunnelWrite<M>,
1309    LISTENER: CmdTunnelListener<M, R, W>,
1310> DefaultCmdServerIncoming<M, R, W, LISTENER>
1311{
1312    pub fn new(
1313        tunnel_listener: LISTENER,
1314        tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1315    ) -> Arc<Self> {
1316        Arc::new(Self {
1317            tunnel_listener,
1318            tunnel_service,
1319            _p: PhantomData,
1320        })
1321    }
1322
1323    pub fn start(self: &Arc<Self>) {
1324        let this = self.clone();
1325        tokio::spawn(async move {
1326            if let Err(e) = this.run().await {
1327                log::error!("cmd server error: {:?}", e);
1328            }
1329        });
1330    }
1331
1332    pub async fn run(&self) -> CmdResult<()> {
1333        loop {
1334            let tunnel = self.tunnel_listener.accept().await?;
1335            let tunnel_service = self.tunnel_service.clone();
1336            tokio::spawn(async move {
1337                if let Err(e) = tunnel_service.handle_tunnel(tunnel).await {
1338                    log::error!("peer connection error: {:?}", e);
1339                }
1340            });
1341        }
1342    }
1343}
1344
1345pub struct DefaultCmdServer<
1346    M: CmdTunnelMeta,
1347    R: CmdTunnelRead<M>,
1348    W: CmdTunnelWrite<M>,
1349    LEN,
1350    CMD,
1351    LISTENER,
1352> {
1353    incoming: Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>>,
1354    service: Arc<DefaultCmdServerService<M, R, W, LEN, CMD>>,
1355}
1356
1357impl<
1358    M: CmdTunnelMeta,
1359    R: CmdTunnelRead<M>,
1360    W: CmdTunnelWrite<M>,
1361    LEN: RawEncode
1362        + for<'a> RawDecode<'a>
1363        + Copy
1364        + RawFixedBytes
1365        + Sync
1366        + Send
1367        + 'static
1368        + FromPrimitive
1369        + ToPrimitive,
1370    CMD: RawEncode
1371        + for<'a> RawDecode<'a>
1372        + Copy
1373        + RawFixedBytes
1374        + Sync
1375        + Send
1376        + 'static
1377        + Eq
1378        + Hash
1379        + Debug,
1380    LISTENER: CmdTunnelListener<M, R, W>,
1381> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1382{
1383    pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
1384        let service = DefaultCmdServerService::new();
1385        let incoming = DefaultCmdServerIncoming::new(tunnel_listener, service.clone());
1386        Arc::new(Self { incoming, service })
1387    }
1388
1389    pub fn incoming(&self) -> Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>> {
1390        self.incoming.clone()
1391    }
1392
1393    pub fn service(&self) -> Arc<DefaultCmdServerService<M, R, W, LEN, CMD>> {
1394        self.service.clone()
1395    }
1396
1397    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
1398        self.service.attach_event_listener(event_listener);
1399    }
1400
1401    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
1402        self.service.get_peer_tunnels(peer_id).await
1403    }
1404
1405    pub fn start(self: &Arc<Self>) {
1406        self.incoming.start();
1407    }
1408}
1409
1410impl<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> Deref
1411    for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1412{
1413    type Target = DefaultCmdServerService<M, R, W, LEN, CMD>;
1414
1415    fn deref(&self) -> &Self::Target {
1416        self.service.as_ref()
1417    }
1418}
1419
1420#[async_trait::async_trait]
1421impl<
1422    M: CmdTunnelMeta,
1423    R: CmdTunnelRead<M>,
1424    W: CmdTunnelWrite<M>,
1425    LEN: RawEncode
1426        + for<'a> RawDecode<'a>
1427        + Copy
1428        + RawFixedBytes
1429        + Sync
1430        + Send
1431        + 'static
1432        + FromPrimitive
1433        + ToPrimitive,
1434    CMD: RawEncode
1435        + for<'a> RawDecode<'a>
1436        + Copy
1437        + RawFixedBytes
1438        + Sync
1439        + Send
1440        + 'static
1441        + Eq
1442        + Hash
1443        + Debug,
1444    LISTENER: CmdTunnelListener<M, R, W>,
1445> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1446{
1447    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
1448        self.service.register_cmd_handler(cmd, handler);
1449    }
1450
1451    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
1452        self.service.send(peer_id, cmd, version, body).await
1453    }
1454
1455    async fn send_with_resp(
1456        &self,
1457        peer_id: &PeerId,
1458        cmd: CMD,
1459        version: u8,
1460        body: &[u8],
1461        timeout: Duration,
1462    ) -> CmdResult<CmdBody> {
1463        self.service
1464            .send_with_resp(peer_id, cmd, version, body, timeout)
1465            .await
1466    }
1467
1468    async fn send_parts(
1469        &self,
1470        peer_id: &PeerId,
1471        cmd: CMD,
1472        version: u8,
1473        body: &[&[u8]],
1474    ) -> CmdResult<()> {
1475        self.service.send_parts(peer_id, cmd, version, body).await
1476    }
1477
1478    async fn send_parts_with_resp(
1479        &self,
1480        peer_id: &PeerId,
1481        cmd: CMD,
1482        version: u8,
1483        body: &[&[u8]],
1484        timeout: Duration,
1485    ) -> CmdResult<CmdBody> {
1486        self.service
1487            .send_parts_with_resp(peer_id, cmd, version, body, timeout)
1488            .await
1489    }
1490
1491    async fn send_cmd(
1492        &self,
1493        peer_id: &PeerId,
1494        cmd: CMD,
1495        version: u8,
1496        body: CmdBody,
1497    ) -> CmdResult<()> {
1498        self.service.send_cmd(peer_id, cmd, version, body).await
1499    }
1500
1501    async fn send_cmd_with_resp(
1502        &self,
1503        peer_id: &PeerId,
1504        cmd: CMD,
1505        version: u8,
1506        body: CmdBody,
1507        timeout: Duration,
1508    ) -> CmdResult<CmdBody> {
1509        self.service
1510            .send_cmd_with_resp(peer_id, cmd, version, body, timeout)
1511            .await
1512    }
1513
1514    async fn send_by_specify_tunnel(
1515        &self,
1516        peer_id: &PeerId,
1517        tunnel_id: TunnelId,
1518        cmd: CMD,
1519        version: u8,
1520        body: &[u8],
1521    ) -> CmdResult<()> {
1522        self.service
1523            .send_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1524            .await
1525    }
1526
1527    async fn send_by_specify_tunnel_with_resp(
1528        &self,
1529        peer_id: &PeerId,
1530        tunnel_id: TunnelId,
1531        cmd: CMD,
1532        version: u8,
1533        body: &[u8],
1534        timeout: Duration,
1535    ) -> CmdResult<CmdBody> {
1536        self.service
1537            .send_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1538            .await
1539    }
1540
1541    async fn send_parts_by_specify_tunnel(
1542        &self,
1543        peer_id: &PeerId,
1544        tunnel_id: TunnelId,
1545        cmd: CMD,
1546        version: u8,
1547        body: &[&[u8]],
1548    ) -> CmdResult<()> {
1549        self.service
1550            .send_parts_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1551            .await
1552    }
1553
1554    async fn send_parts_by_specify_tunnel_with_resp(
1555        &self,
1556        peer_id: &PeerId,
1557        tunnel_id: TunnelId,
1558        cmd: CMD,
1559        version: u8,
1560        body: &[&[u8]],
1561        timeout: Duration,
1562    ) -> CmdResult<CmdBody> {
1563        self.service
1564            .send_parts_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1565            .await
1566    }
1567
1568    async fn send_cmd_by_specify_tunnel(
1569        &self,
1570        peer_id: &PeerId,
1571        tunnel_id: TunnelId,
1572        cmd: CMD,
1573        version: u8,
1574        body: CmdBody,
1575    ) -> CmdResult<()> {
1576        self.service
1577            .send_cmd_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1578            .await
1579    }
1580
1581    async fn send_cmd_by_specify_tunnel_with_resp(
1582        &self,
1583        peer_id: &PeerId,
1584        tunnel_id: TunnelId,
1585        cmd: CMD,
1586        version: u8,
1587        body: CmdBody,
1588        timeout: Duration,
1589    ) -> CmdResult<CmdBody> {
1590        self.service
1591            .send_cmd_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1592            .await
1593    }
1594
1595    async fn send_by_all_tunnels(
1596        &self,
1597        peer_id: &PeerId,
1598        cmd: CMD,
1599        version: u8,
1600        body: &[u8],
1601    ) -> CmdResult<()> {
1602        self.service
1603            .send_by_all_tunnels(peer_id, cmd, version, body)
1604            .await
1605    }
1606
1607    async fn send_parts_by_all_tunnels(
1608        &self,
1609        peer_id: &PeerId,
1610        cmd: CMD,
1611        version: u8,
1612        body: &[&[u8]],
1613    ) -> CmdResult<()> {
1614        self.service
1615            .send_parts_by_all_tunnels(peer_id, cmd, version, body)
1616            .await
1617    }
1618}