Skip to main content

sfo_cmd_server/server/
server.rs

1use super::peer_manager::{PeerManager, PeerManagerRef};
2use crate::client::{RespWaiter, RespWaiterRef, gen_resp_id, gen_seq};
3use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
4use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
5use crate::peer_connection::PeerConnection;
6use crate::peer_id::PeerId;
7use crate::server::CmdServer;
8use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
9use async_named_locker::{NamedStateHolder, ObjectHolder};
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_split::Splittable;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::DerefMut;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
19
20#[async_trait::async_trait]
21pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
22    Send + Sync + 'static
23{
24    async fn accept(&self) -> CmdResult<Splittable<R, W>>;
25}
26
27#[async_trait::async_trait]
28pub trait CmdServerEventListener: Send + Sync + 'static {
29    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
30    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
31}
32
33#[derive(Clone)]
34struct CmdServerEventListenerEmit {
35    listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
36}
37
38impl CmdServerEventListenerEmit {
39    pub fn new() -> Self {
40        Self {
41            listeners: Arc::new(Mutex::new(Vec::new())),
42        }
43    }
44
45    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
46        self.listeners.lock().unwrap().push(event_listener);
47    }
48}
49
50#[async_trait::async_trait]
51impl CmdServerEventListener for CmdServerEventListenerEmit {
52    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
53        let listeners = { self.listeners.lock().unwrap().clone() };
54        for listener in listeners.iter() {
55            if let Err(e) = listener.on_peer_connected(peer_id).await {
56                log::error!("on_peer_connected error: {:?}", e);
57            }
58        }
59        Ok(())
60    }
61
62    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
63        let listeners = { self.listeners.lock().unwrap().clone() };
64        for listener in listeners.iter() {
65            if let Err(e) = listener.on_peer_disconnected(peer_id).await {
66                log::error!("on_peer_disconnected error: {:?}", e);
67            }
68        }
69        Ok(())
70    }
71}
72
73pub struct DefaultCmdServer<
74    M: CmdTunnelMeta,
75    R: CmdTunnelRead<M>,
76    W: CmdTunnelWrite<M>,
77    LEN,
78    CMD,
79    LISTENER,
80> {
81    tunnel_listener: LISTENER,
82    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
83    peer_manager: PeerManagerRef<M, R, W>,
84    event_emit: CmdServerEventListenerEmit,
85    resp_waiter: RespWaiterRef,
86    state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
87    _l: Mutex<std::marker::PhantomData<(R, W, LEN, CMD)>>,
88}
89
90impl<
91    M: CmdTunnelMeta,
92    R: CmdTunnelRead<M>,
93    W: CmdTunnelWrite<M>,
94    LEN: RawEncode
95        + for<'a> RawDecode<'a>
96        + Copy
97        + RawFixedBytes
98        + Sync
99        + Send
100        + 'static
101        + FromPrimitive
102        + ToPrimitive,
103    CMD: RawEncode
104        + for<'a> RawDecode<'a>
105        + Copy
106        + RawFixedBytes
107        + Sync
108        + Send
109        + 'static
110        + Eq
111        + Hash
112        + Debug,
113    LISTENER: CmdTunnelListener<M, R, W>,
114> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
115{
116    pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
117        let event_emit = CmdServerEventListenerEmit::new();
118        Arc::new(Self {
119            tunnel_listener,
120            cmd_handler_map: Arc::new(CmdHandlerMap::new()),
121            peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
122            event_emit,
123            resp_waiter: Arc::new(RespWaiter::new()),
124            state_holder: NamedStateHolder::new(),
125            _l: Default::default(),
126        })
127    }
128
129    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
130        self.event_emit.attach_event_listener(event_listener);
131    }
132
133    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
134        let connections = self.peer_manager.find_connections(peer_id);
135        connections
136    }
137
138    pub fn start(self: &Arc<Self>) {
139        let this = self.clone();
140        tokio::spawn(async move {
141            if let Err(e) = this.run().await {
142                log::error!("cmd server error: {:?}", e);
143            }
144        });
145    }
146
147    async fn run(self: &Arc<Self>) -> CmdResult<()> {
148        loop {
149            let tunnel = self.tunnel_listener.accept().await?;
150            let peer_id = tunnel.get_remote_peer_id();
151            let tunnel_id = self.peer_manager.generate_conn_id();
152            let this = self.clone();
153            let resp_waiter = self.resp_waiter.clone();
154            let state_holder = self.state_holder.clone();
155            tokio::spawn(async move {
156                let remote_id = peer_id.clone();
157                let ret: CmdResult<()> = async move {
158                    let this = this.clone();
159                    let cmd_handler_map = this.cmd_handler_map.clone();
160                    let (mut reader, writer) = tunnel.split();
161                    let writer = ObjectHolder::new(writer);
162                    let resp_write = writer.clone();
163                    let resp_waiter = resp_waiter.clone();
164                    let state_holder = state_holder.clone();
165                    let recv_handle = tokio::spawn(async move {
166                        let ret: CmdResult<()> = async move {
167                            loop {
168                                let header_len = reader
169                                    .read_u8()
170                                    .await
171                                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
172                                let mut header = vec![0u8; header_len as usize];
173                                let n = reader
174                                    .read_exact(&mut header)
175                                    .await
176                                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
177                                if n == 0 {
178                                    break;
179                                }
180                                let header =
181                                    CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
182                                        .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
183                                sfo_log::debug!("recv cmd {:?}", header.cmd_code());
184                                let body_len = header.pkg_len().to_u64().unwrap();
185                                let cmd_read = CmdBodyRead::new(
186                                    reader,
187                                    header.pkg_len().to_u64().unwrap() as usize,
188                                );
189                                let waiter = cmd_read.get_waiter();
190                                let future = waiter
191                                    .create_result_future()
192                                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
193                                {
194                                    let body_read = cmd_read;
195                                    let body =
196                                        CmdBody::from_reader(BufReader::new(body_read), body_len);
197                                    if header.is_resp() && header.seq().is_some() {
198                                        let resp_id = gen_resp_id(
199                                            tunnel_id,
200                                            header.cmd_code(),
201                                            header.seq().unwrap(),
202                                        );
203                                        let _ = resp_waiter.set_result(resp_id, body);
204                                    } else {
205                                        if let Some(handler) =
206                                            cmd_handler_map.get(header.cmd_code())
207                                        {
208                                            let version = header.version();
209                                            let seq = header.seq();
210                                            let cmd_code = header.cmd_code();
211                                            match {
212                                                let _handle_state =
213                                                    state_holder.new_state(tokio::task::id());
214                                                handler
215                                                    .handle(
216                                                        remote_id.clone(),
217                                                        tunnel_id,
218                                                        header,
219                                                        body,
220                                                    )
221                                                    .await
222                                            } {
223                                                Ok(Some(mut body)) => {
224                                                    let mut write = resp_write.get().await;
225                                                    let header = CmdHeader::<LEN, CMD>::new(
226                                                        version,
227                                                        true,
228                                                        seq,
229                                                        cmd_code,
230                                                        LEN::from_u64(body.len()).unwrap(),
231                                                    );
232                                                    let buf = header.to_vec().map_err(
233                                                        into_cmd_err!(CmdErrorCode::RawCodecError),
234                                                    )?;
235                                                    if buf.len() > 255 {
236                                                        return Err(cmd_err!(
237                                                            CmdErrorCode::RawCodecError,
238                                                            "header len too large"
239                                                        ));
240                                                    }
241                                                    write.write_u8(buf.len() as u8).await.map_err(
242                                                        into_cmd_err!(CmdErrorCode::IoError),
243                                                    )?;
244                                                    write.write_all(buf.as_slice()).await.map_err(
245                                                        into_cmd_err!(CmdErrorCode::IoError),
246                                                    )?;
247                                                    tokio::io::copy(
248                                                        &mut body,
249                                                        write.deref_mut().deref_mut(),
250                                                    )
251                                                    .await
252                                                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
253                                                    write.flush().await.map_err(into_cmd_err!(
254                                                        CmdErrorCode::IoError
255                                                    ))?;
256                                                }
257                                                Err(e) => {
258                                                    log::error!("handle cmd error: {:?}", e);
259                                                }
260                                                _ => {}
261                                            }
262                                        }
263                                    }
264                                };
265                                reader = future
266                                    .await
267                                    .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
268                                // }
269                            }
270                            Ok(())
271                        }
272                        .await;
273                        if ret.is_err() {
274                            log::error!("recv cmd error: {:?}", ret.as_ref().err().unwrap());
275                        }
276                        ret
277                    });
278
279                    let peer_conn = PeerConnection {
280                        conn_id: tunnel_id,
281                        peer_id: peer_id.clone(),
282                        send: writer,
283                        handle: Some(recv_handle),
284                    };
285                    this.peer_manager.add_peer_connection(peer_conn).await;
286                    Ok(())
287                }
288                .await;
289                if let Err(e) = ret {
290                    log::error!("peer connection error: {:?}", e);
291                }
292            });
293        }
294    }
295}
296
297#[async_trait::async_trait]
298impl<
299    M: CmdTunnelMeta,
300    R: CmdTunnelRead<M>,
301    W: CmdTunnelWrite<M>,
302    LEN: RawEncode
303        + for<'a> RawDecode<'a>
304        + Copy
305        + RawFixedBytes
306        + Sync
307        + Send
308        + 'static
309        + FromPrimitive
310        + ToPrimitive,
311    CMD: RawEncode
312        + for<'a> RawDecode<'a>
313        + Copy
314        + RawFixedBytes
315        + Sync
316        + Send
317        + 'static
318        + Eq
319        + Hash
320        + Debug,
321    LISTENER: CmdTunnelListener<M, R, W>,
322> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
323{
324    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
325        self.cmd_handler_map.insert(cmd, handler);
326    }
327
328    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
329        let connections = self.peer_manager.find_connections(peer_id);
330        for conn in connections {
331            let ret: CmdResult<()> = async move {
332                log::debug!(
333                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
334                    peer_id,
335                    conn.conn_id,
336                    cmd,
337                    body.len(),
338                    hex::encode(body)
339                );
340                let header = CmdHeader::<LEN, CMD>::new(
341                    version,
342                    false,
343                    None,
344                    cmd,
345                    LEN::from_u64(body.len() as u64).unwrap(),
346                );
347                let buf = header
348                    .to_vec()
349                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
350                let mut send = conn.send.get().await;
351                if buf.len() > 255 {
352                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
353                }
354                send.write_u8(buf.len() as u8)
355                    .await
356                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
357                send.write_all(buf.as_slice())
358                    .await
359                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
360                send.write_all(body)
361                    .await
362                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
363                send.flush()
364                    .await
365                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
366                Ok(())
367            }
368            .await;
369            if ret.is_ok() {
370                break;
371            }
372        }
373        Ok(())
374    }
375
376    async fn send_with_resp(
377        &self,
378        peer_id: &PeerId,
379        cmd: CMD,
380        version: u8,
381        body: &[u8],
382        timeout: Duration,
383    ) -> CmdResult<CmdBody> {
384        let connections = self.peer_manager.find_connections(peer_id);
385        for conn in connections {
386            if let Some(id) = tokio::task::try_id() {
387                if self.state_holder.has_state(id) {
388                    continue;
389                }
390            }
391            let ret: CmdResult<CmdBody> = async move {
392                log::debug!(
393                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
394                    peer_id,
395                    conn.conn_id,
396                    cmd,
397                    body.len(),
398                    hex::encode(body)
399                );
400                let seq = gen_seq();
401                let header = CmdHeader::<LEN, CMD>::new(
402                    version,
403                    false,
404                    Some(seq),
405                    cmd,
406                    LEN::from_u64(body.len() as u64).unwrap(),
407                );
408                let buf = header
409                    .to_vec()
410                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
411                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
412                let waiter = self
413                    .resp_waiter
414                    .create_timeout_result_future(resp_id, timeout)
415                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
416                {
417                    let mut send = conn.send.get().await;
418                    if buf.len() > 255 {
419                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
420                    }
421                    send.write_u8(buf.len() as u8)
422                        .await
423                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
424                    send.write_all(buf.as_slice())
425                        .await
426                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
427                    send.write_all(body)
428                        .await
429                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
430                    send.flush()
431                        .await
432                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
433                }
434                let body =
435                    waiter
436                        .await
437                        .map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
438                Ok(body)
439            }
440            .await;
441            if ret.is_ok() {
442                return ret;
443            } else {
444                sfo_log::error!("send err {:?}", ret.unwrap_err());
445            }
446        }
447        Err(cmd_err!(
448            CmdErrorCode::Failed,
449            "send to peer_id: {}",
450            peer_id
451        ))
452    }
453
454    async fn send2(
455        &self,
456        peer_id: &PeerId,
457        cmd: CMD,
458        version: u8,
459        body: &[&[u8]],
460    ) -> CmdResult<()> {
461        let connections = self.peer_manager.find_connections(peer_id);
462        for conn in connections {
463            let ret: CmdResult<()> = async move {
464                let mut len = 0;
465                for b in body.iter() {
466                    len += b.len();
467                    log::debug!(
468                        "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
469                        peer_id,
470                        conn.conn_id,
471                        cmd,
472                        hex::encode(b)
473                    );
474                }
475                log::debug!(
476                    "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
477                    peer_id,
478                    conn.conn_id,
479                    cmd,
480                    len
481                );
482                let header = CmdHeader::<LEN, CMD>::new(
483                    version,
484                    false,
485                    None,
486                    cmd,
487                    LEN::from_u64(len as u64).unwrap(),
488                );
489                let buf = header
490                    .to_vec()
491                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
492                let mut send = conn.send.get().await;
493                if buf.len() > 255 {
494                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
495                }
496                send.write_u8(buf.len() as u8)
497                    .await
498                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
499                send.write_all(buf.as_slice())
500                    .await
501                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
502                for b in body.iter() {
503                    send.write_all(b)
504                        .await
505                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
506                }
507                send.flush()
508                    .await
509                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
510                Ok(())
511            }
512            .await;
513            if ret.is_ok() {
514                break;
515            }
516        }
517        Ok(())
518    }
519
520    async fn send2_with_resp(
521        &self,
522        peer_id: &PeerId,
523        cmd: CMD,
524        version: u8,
525        body: &[&[u8]],
526        timeout: Duration,
527    ) -> CmdResult<CmdBody> {
528        let connections = self.peer_manager.find_connections(peer_id);
529        for conn in connections {
530            if let Some(id) = tokio::task::try_id() {
531                if self.state_holder.has_state(id) {
532                    continue;
533                }
534            }
535            let ret: CmdResult<CmdBody> = async move {
536                let mut len = 0;
537                for b in body.iter() {
538                    len += b.len();
539                    log::debug!(
540                        "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
541                        peer_id,
542                        conn.conn_id,
543                        cmd,
544                        hex::encode(b)
545                    );
546                }
547                log::debug!(
548                    "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
549                    peer_id,
550                    conn.conn_id,
551                    cmd,
552                    len
553                );
554                let seq = gen_seq();
555                let header = CmdHeader::<LEN, CMD>::new(
556                    version,
557                    false,
558                    Some(seq),
559                    cmd,
560                    LEN::from_u64(len as u64).unwrap(),
561                );
562                let buf = header
563                    .to_vec()
564                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
565                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
566                let waiter = self
567                    .resp_waiter
568                    .create_timeout_result_future(resp_id, timeout)
569                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
570                {
571                    let mut send = conn.send.get().await;
572                    if buf.len() > 255 {
573                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
574                    }
575                    send.write_u8(buf.len() as u8)
576                        .await
577                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
578                    send.write_all(buf.as_slice())
579                        .await
580                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
581                    for b in body.iter() {
582                        send.write_all(b)
583                            .await
584                            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
585                    }
586                    send.flush()
587                        .await
588                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
589                }
590                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
591                Ok(body)
592            }
593            .await;
594            if ret.is_ok() {
595                return ret;
596            }
597        }
598        Err(cmd_err!(
599            CmdErrorCode::Failed,
600            "send to peer_id: {}",
601            peer_id
602        ))
603    }
604
605    async fn send_cmd(
606        &self,
607        peer_id: &PeerId,
608        cmd: CMD,
609        version: u8,
610        body: CmdBody,
611    ) -> CmdResult<()> {
612        let body_data = body.into_bytes().await?;
613        let body = body_data.as_slice();
614        let connections = self.peer_manager.find_connections(peer_id);
615        for conn in connections {
616            let ret: CmdResult<()> = async move {
617                log::debug!(
618                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
619                    peer_id,
620                    conn.conn_id,
621                    cmd,
622                    body.len(),
623                    hex::encode(body)
624                );
625                let header = CmdHeader::<LEN, CMD>::new(
626                    version,
627                    false,
628                    None,
629                    cmd,
630                    LEN::from_u64(body.len() as u64).unwrap(),
631                );
632                let buf = header
633                    .to_vec()
634                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
635                let mut send = conn.send.get().await;
636                if buf.len() > 255 {
637                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
638                }
639                send.write_u8(buf.len() as u8)
640                    .await
641                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
642                send.write_all(buf.as_slice())
643                    .await
644                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
645                send.write_all(body)
646                    .await
647                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
648                send.flush()
649                    .await
650                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
651                Ok(())
652            }
653            .await;
654            if ret.is_ok() {
655                break;
656            }
657        }
658        Ok(())
659    }
660
661    async fn send_cmd_with_resp(
662        &self,
663        peer_id: &PeerId,
664        cmd: CMD,
665        version: u8,
666        body: CmdBody,
667        timeout: Duration,
668    ) -> CmdResult<CmdBody> {
669        let connections = self.peer_manager.find_connections(peer_id);
670        let body_data = body.into_bytes().await?;
671        let data_ref = body_data.as_slice();
672        for conn in connections {
673            if let Some(id) = tokio::task::try_id() {
674                if self.state_holder.has_state(id) {
675                    continue;
676                }
677            }
678            let ret: CmdResult<CmdBody> = async move {
679                log::debug!(
680                    "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}",
681                    peer_id,
682                    conn.conn_id,
683                    cmd,
684                    data_ref.len()
685                );
686                let seq = gen_seq();
687                let header = CmdHeader::<LEN, CMD>::new(
688                    version,
689                    false,
690                    Some(seq),
691                    cmd,
692                    LEN::from_u64(data_ref.len() as u64).unwrap(),
693                );
694                let buf = header
695                    .to_vec()
696                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
697                let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
698                let waiter = self
699                    .resp_waiter
700                    .create_timeout_result_future(resp_id, timeout)
701                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
702                {
703                    let mut send = conn.send.get().await;
704                    if buf.len() > 255 {
705                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
706                    }
707                    send.write_u8(buf.len() as u8)
708                        .await
709                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
710                    send.write_all(buf.as_slice())
711                        .await
712                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
713                    send.write_all(data_ref)
714                        .await
715                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
716                    send.flush()
717                        .await
718                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
719                }
720                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
721                Ok(body)
722            }
723            .await;
724            if ret.is_ok() {
725                return ret;
726            }
727        }
728        Err(cmd_err!(
729            CmdErrorCode::Failed,
730            "send to peer_id: {}",
731            peer_id
732        ))
733    }
734
735    async fn send_by_specify_tunnel(
736        &self,
737        peer_id: &PeerId,
738        tunnel_id: TunnelId,
739        cmd: CMD,
740        version: u8,
741        body: &[u8],
742    ) -> CmdResult<()> {
743        let conn = self.peer_manager.find_connection(tunnel_id);
744        if conn.is_none() {
745            return Err(cmd_err!(
746                CmdErrorCode::PeerConnectionNotFound,
747                "tunnel_id: {:?}",
748                tunnel_id
749            ));
750        }
751        let conn = conn.unwrap();
752        assert_eq!(tunnel_id, conn.conn_id);
753        log::trace!(
754            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
755            peer_id,
756            conn.conn_id,
757            cmd,
758            body.len(),
759            hex::encode(body)
760        );
761        let header = CmdHeader::<LEN, CMD>::new(
762            version,
763            false,
764            None,
765            cmd,
766            LEN::from_u64(body.len() as u64).unwrap(),
767        );
768        let buf = header
769            .to_vec()
770            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
771        let mut send = conn.send.get().await;
772        if buf.len() > 255 {
773            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
774        }
775        send.write_u8(buf.len() as u8)
776            .await
777            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
778        send.write_all(buf.as_slice())
779            .await
780            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
781        send.write_all(body)
782            .await
783            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
784        send.flush()
785            .await
786            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
787        Ok(())
788    }
789
790    async fn send_by_specify_tunnel_with_resp(
791        &self,
792        peer_id: &PeerId,
793        tunnel_id: TunnelId,
794        cmd: CMD,
795        version: u8,
796        body: &[u8],
797        timeout: Duration,
798    ) -> CmdResult<CmdBody> {
799        let conn = self.peer_manager.find_connection(tunnel_id);
800        if conn.is_none() {
801            return Err(cmd_err!(
802                CmdErrorCode::PeerConnectionNotFound,
803                "tunnel_id: {:?}",
804                tunnel_id
805            ));
806        }
807        let conn = conn.unwrap();
808        if let Some(id) = tokio::task::try_id() {
809            if self.state_holder.has_state(id) {
810                return Err(cmd_err!(
811                    CmdErrorCode::Failed,
812                    "can't send msg with resp in tunnel {:?} msg handle",
813                    conn.conn_id
814                ));
815            }
816        }
817        assert_eq!(tunnel_id, conn.conn_id);
818        log::trace!(
819            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
820            peer_id,
821            conn.conn_id,
822            cmd,
823            body.len(),
824            hex::encode(body)
825        );
826        let seq = gen_seq();
827        let header = CmdHeader::<LEN, CMD>::new(
828            version,
829            false,
830            Some(seq),
831            cmd,
832            LEN::from_u64(body.len() as u64).unwrap(),
833        );
834        let buf = header
835            .to_vec()
836            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
837        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
838        let waiter = self
839            .resp_waiter
840            .create_timeout_result_future(resp_id, timeout)
841            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
842        {
843            let mut send = conn.send.get().await;
844            if buf.len() > 255 {
845                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
846            }
847            send.write_u8(buf.len() as u8)
848                .await
849                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
850            send.write_all(buf.as_slice())
851                .await
852                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
853            send.write_all(body)
854                .await
855                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
856            send.flush()
857                .await
858                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
859        }
860        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
861        Ok(body)
862    }
863
864    async fn send2_by_specify_tunnel(
865        &self,
866        peer_id: &PeerId,
867        tunnel_id: TunnelId,
868        cmd: CMD,
869        version: u8,
870        body: &[&[u8]],
871    ) -> CmdResult<()> {
872        let conn = self.peer_manager.find_connection(tunnel_id);
873        if conn.is_none() {
874            return Err(cmd_err!(
875                CmdErrorCode::PeerConnectionNotFound,
876                "tunnel_id: {:?}",
877                tunnel_id
878            ));
879        }
880        let conn = conn.unwrap();
881        assert_eq!(tunnel_id, conn.conn_id);
882        let mut len = 0;
883        for b in body.iter() {
884            len += b.len();
885            log::debug!(
886                "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
887                peer_id,
888                conn.conn_id,
889                cmd,
890                hex::encode(b)
891            );
892        }
893        log::debug!(
894            "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
895            peer_id,
896            conn.conn_id,
897            cmd,
898            len
899        );
900        let header = CmdHeader::<LEN, CMD>::new(
901            version,
902            false,
903            None,
904            cmd,
905            LEN::from_u64(len as u64).unwrap(),
906        );
907        let buf = header
908            .to_vec()
909            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
910        if buf.len() > 255 {
911            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
912        }
913        let mut send = conn.send.get().await;
914        send.write_u8(buf.len() as u8)
915            .await
916            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
917        send.write_all(buf.as_slice())
918            .await
919            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
920        for b in body.iter() {
921            send.write_all(b)
922                .await
923                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
924        }
925        send.flush()
926            .await
927            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
928        Ok(())
929    }
930
931    async fn send2_by_specify_tunnel_with_resp(
932        &self,
933        peer_id: &PeerId,
934        tunnel_id: TunnelId,
935        cmd: CMD,
936        version: u8,
937        body: &[&[u8]],
938        timeout: Duration,
939    ) -> CmdResult<CmdBody> {
940        let conn = self.peer_manager.find_connection(tunnel_id);
941        if conn.is_none() {
942            return Err(cmd_err!(
943                CmdErrorCode::PeerConnectionNotFound,
944                "tunnel_id: {:?}",
945                tunnel_id
946            ));
947        }
948        let conn = conn.unwrap();
949        if let Some(id) = tokio::task::try_id() {
950            if self.state_holder.has_state(id) {
951                return Err(cmd_err!(
952                    CmdErrorCode::Failed,
953                    "can't send msg with resp in tunnel {:?} msg handle",
954                    conn.conn_id
955                ));
956            }
957        }
958        assert_eq!(tunnel_id, conn.conn_id);
959        let mut len = 0;
960        for b in body.iter() {
961            len += b.len();
962            log::debug!(
963                "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
964                peer_id,
965                conn.conn_id,
966                cmd,
967                hex::encode(b)
968            );
969        }
970        log::debug!(
971            "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
972            peer_id,
973            conn.conn_id,
974            cmd,
975            len
976        );
977        let seq = gen_seq();
978        let header = CmdHeader::<LEN, CMD>::new(
979            version,
980            false,
981            Some(seq),
982            cmd,
983            LEN::from_u64(len as u64).unwrap(),
984        );
985        let buf = header
986            .to_vec()
987            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
988        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
989        let waiter = self
990            .resp_waiter
991            .create_timeout_result_future(resp_id, timeout)
992            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
993        if buf.len() > 255 {
994            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
995        }
996        {
997            let mut send = conn.send.get().await;
998            send.write_u8(buf.len() as u8)
999                .await
1000                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1001            send.write_all(buf.as_slice())
1002                .await
1003                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1004            for b in body.iter() {
1005                send.write_all(b)
1006                    .await
1007                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1008            }
1009            send.flush()
1010                .await
1011                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1012        }
1013        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1014        Ok(body)
1015    }
1016
1017    async fn send_cmd_by_specify_tunnel(
1018        &self,
1019        peer_id: &PeerId,
1020        tunnel_id: TunnelId,
1021        cmd: CMD,
1022        version: u8,
1023        mut body: CmdBody,
1024    ) -> CmdResult<()> {
1025        let conn = self.peer_manager.find_connection(tunnel_id);
1026        if conn.is_none() {
1027            return Err(cmd_err!(
1028                CmdErrorCode::PeerConnectionNotFound,
1029                "tunnel_id: {:?}",
1030                tunnel_id
1031            ));
1032        }
1033        let conn = conn.unwrap();
1034        assert_eq!(tunnel_id, conn.conn_id);
1035        log::debug!(
1036            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1037            peer_id,
1038            conn.conn_id,
1039            cmd,
1040            body.len()
1041        );
1042        let header = CmdHeader::<LEN, CMD>::new(
1043            version,
1044            false,
1045            None,
1046            cmd,
1047            LEN::from_u64(body.len()).unwrap(),
1048        );
1049        let buf = header
1050            .to_vec()
1051            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1052        let mut send = conn.send.get().await;
1053        if buf.len() > 255 {
1054            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1055        }
1056        send.write_u8(buf.len() as u8)
1057            .await
1058            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1059        send.write_all(buf.as_slice())
1060            .await
1061            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1062        tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1063            .await
1064            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1065        send.flush()
1066            .await
1067            .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1068        Ok(())
1069    }
1070
1071    async fn send_cmd_by_specify_tunnel_with_resp(
1072        &self,
1073        peer_id: &PeerId,
1074        tunnel_id: TunnelId,
1075        cmd: CMD,
1076        version: u8,
1077        mut body: CmdBody,
1078        timeout: Duration,
1079    ) -> CmdResult<CmdBody> {
1080        let conn = self.peer_manager.find_connection(tunnel_id);
1081        if conn.is_none() {
1082            return Err(cmd_err!(
1083                CmdErrorCode::PeerConnectionNotFound,
1084                "tunnel_id: {:?}",
1085                tunnel_id
1086            ));
1087        }
1088        let conn = conn.unwrap();
1089        if let Some(id) = tokio::task::try_id() {
1090            if self.state_holder.has_state(id) {
1091                return Err(cmd_err!(
1092                    CmdErrorCode::Failed,
1093                    "can't send msg with resp in tunnel {:?} msg handle",
1094                    conn.conn_id
1095                ));
1096            }
1097        }
1098        assert_eq!(tunnel_id, conn.conn_id);
1099        log::debug!(
1100            "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1101            peer_id,
1102            conn.conn_id,
1103            cmd,
1104            body.len()
1105        );
1106        let seq = gen_seq();
1107        let header = CmdHeader::<LEN, CMD>::new(
1108            version,
1109            false,
1110            Some(seq),
1111            cmd,
1112            LEN::from_u64(body.len()).unwrap(),
1113        );
1114        let buf = header
1115            .to_vec()
1116            .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1117        let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1118        let waiter = self
1119            .resp_waiter
1120            .create_timeout_result_future(resp_id, timeout)
1121            .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1122        {
1123            let mut send = conn.send.get().await;
1124            if buf.len() > 255 {
1125                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1126            }
1127            send.write_u8(buf.len() as u8)
1128                .await
1129                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1130            send.write_all(buf.as_slice())
1131                .await
1132                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1133            tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1134                .await
1135                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1136            send.flush()
1137                .await
1138                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1139        }
1140        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1141        Ok(body)
1142    }
1143
1144    async fn send_by_all_tunnels(
1145        &self,
1146        peer_id: &PeerId,
1147        cmd: CMD,
1148        version: u8,
1149        body: &[u8],
1150    ) -> CmdResult<()> {
1151        let connections = self.peer_manager.find_connections(peer_id);
1152        for conn in connections {
1153            let _ret: CmdResult<()> = async move {
1154                let header = CmdHeader::<LEN, CMD>::new(
1155                    version,
1156                    false,
1157                    None,
1158                    cmd,
1159                    LEN::from_u64(body.len() as u64).unwrap(),
1160                );
1161                let buf = header
1162                    .to_vec()
1163                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1164                let mut send = conn.send.get().await;
1165                send.write_u8(buf.len() as u8)
1166                    .await
1167                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1168                send.write_all(buf.as_slice())
1169                    .await
1170                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1171                send.write_all(body)
1172                    .await
1173                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1174                send.flush()
1175                    .await
1176                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1177                Ok(())
1178            }
1179            .await;
1180        }
1181        Ok(())
1182    }
1183
1184    async fn send2_by_all_tunnels(
1185        &self,
1186        peer_id: &PeerId,
1187        cmd: CMD,
1188        version: u8,
1189        body: &[&[u8]],
1190    ) -> CmdResult<()> {
1191        let connections = self.peer_manager.find_connections(peer_id);
1192        let mut len = 0;
1193        for b in body.iter() {
1194            len += b.len();
1195        }
1196        for conn in connections {
1197            let _ret: CmdResult<()> = async move {
1198                let header = CmdHeader::<LEN, CMD>::new(
1199                    version,
1200                    false,
1201                    None,
1202                    cmd,
1203                    LEN::from_u64(len as u64).unwrap(),
1204                );
1205                let buf = header
1206                    .to_vec()
1207                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1208                if buf.len() > 255 {
1209                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1210                }
1211                let mut send = conn.send.get().await;
1212                send.write_u8(buf.len() as u8)
1213                    .await
1214                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1215                send.write_all(buf.as_slice())
1216                    .await
1217                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1218                for b in body.iter() {
1219                    send.write_all(b)
1220                        .await
1221                        .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1222                }
1223                send.flush()
1224                    .await
1225                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1226                Ok(())
1227            }
1228            .await;
1229        }
1230        Ok(())
1231    }
1232}