sfo_cmd_server/server/
server.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3use std::ops::DerefMut;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6use async_named_locker::{NamedStateHolder, ObjectHolder};
7use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
8use num::{FromPrimitive, ToPrimitive};
9use sfo_split::Splittable;
10use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
11use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
12use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
13use crate::client::{gen_resp_id, gen_seq, RespWaiter, RespWaiterRef};
14use crate::errors::{cmd_err, into_cmd_err, CmdErrorCode, CmdResult};
15use crate::peer_connection::PeerConnection;
16use crate::peer_id::PeerId;
17use crate::server::CmdServer;
18use super::peer_manager::{PeerManager, PeerManagerRef};
19
20#[async_trait::async_trait]
21pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>: Send + Sync + 'static {
22    async fn accept(&self) -> CmdResult<Splittable<R, W>>;
23}
24
25#[async_trait::async_trait]
26pub trait CmdServerEventListener: Send + Sync + 'static {
27    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
28    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
29}
30
31#[derive(Clone)]
32struct CmdServerEventListenerEmit {
33    listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
34}
35
36impl CmdServerEventListenerEmit {
37    pub fn new() -> Self {
38        Self {
39            listeners: Arc::new(Mutex::new(Vec::new())),
40        }
41    }
42
43    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
44        self.listeners.lock().unwrap().push(event_listener);
45    }
46}
47
48#[async_trait::async_trait]
49impl CmdServerEventListener for CmdServerEventListenerEmit {
50    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
51        let listeners = {
52            self.listeners.lock().unwrap().clone()
53        };
54        for listener in listeners.iter() {
55            if let Err(e) = listener.on_peer_connected(peer_id).await {
56                log::error!("on_peer_connected error: {:?}", e);
57            }
58        }
59        Ok(())
60    }
61
62    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
63        let listeners = {
64            self.listeners.lock().unwrap().clone()
65        };
66        for listener in listeners.iter() {
67            if let Err(e) = listener.on_peer_disconnected(peer_id).await {
68                log::error!("on_peer_disconnected error: {:?}", e);
69            }
70        }
71        Ok(())
72    }
73}
74
75pub struct DefaultCmdServer<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> {
76    tunnel_listener: LISTENER,
77    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
78    peer_manager: PeerManagerRef<M, R, W>,
79    event_emit: CmdServerEventListenerEmit,
80    resp_waiter: RespWaiterRef,
81    state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
82    _l: Mutex<std::marker::PhantomData<(R, W, LEN, CMD)>>,
83}
84
85impl<M: CmdTunnelMeta,
86    R: CmdTunnelRead<M>,
87    W: CmdTunnelWrite<M>,
88    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
89    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash + Debug,
90    LISTENER: CmdTunnelListener<M, R, W>> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER> {
91    pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
92        let event_emit = CmdServerEventListenerEmit::new();
93        Arc::new(Self {
94            tunnel_listener,
95            cmd_handler_map: Arc::new(CmdHandlerMap::new()),
96            peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
97            event_emit,
98            resp_waiter: Arc::new(RespWaiter::new()),
99            state_holder: NamedStateHolder::new(),
100            _l: Default::default(),
101        })
102    }
103
104    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
105        self.event_emit.attach_event_listener(event_listener);
106    }
107
108    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
109        let connections = self.peer_manager.find_connections(peer_id);
110        connections
111    }
112
113    pub fn start(self: &Arc<Self>) {
114        let this = self.clone();
115        tokio::spawn(async move {
116            if let Err(e) = this.run().await {
117                log::error!("cmd server error: {:?}", e);
118            }
119        });
120    }
121
122    async fn run(self: &Arc<Self>) -> CmdResult<()> {
123        loop {
124            let tunnel = self.tunnel_listener.accept().await?;
125            let peer_id = tunnel.get_remote_peer_id();
126            let tunnel_id = self.peer_manager.generate_conn_id();
127            let this = self.clone();
128            let resp_waiter = self.resp_waiter.clone();
129            let state_holder = self.state_holder.clone();
130            tokio::spawn(async move {
131                let remote_id = peer_id.clone();
132                let ret: CmdResult<()> = async move {
133                    let this = this.clone();
134                    let cmd_handler_map = this.cmd_handler_map.clone();
135                    let (mut reader, writer) = tunnel.split();
136                    let writer = ObjectHolder::new(writer);
137                    let resp_write = writer.clone();
138                    let resp_waiter = resp_waiter.clone();
139                    let state_holder = state_holder.clone();
140                    let recv_handle = tokio::spawn(async move {
141                        let ret: CmdResult<()> = async move {
142                            loop {
143                                let header_len = reader.read_u8().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
144                                let mut header = vec![0u8; header_len as usize];
145                                let n = reader.read_exact(&mut header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
146                                if n == 0 {
147                                    break;
148                                }
149                                let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
150                                println!("recv cmd {:?}", header.cmd_code());
151                                let body_len = header.pkg_len().to_u64().unwrap();
152                                let cmd_read = CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
153                                let waiter = cmd_read.get_waiter();
154                                let future = waiter.create_result_future().map_err(into_cmd_err!(CmdErrorCode::Failed))?;
155                                {
156                                    let body_read = cmd_read;
157                                    let body = CmdBody::from_reader(BufReader::new(body_read), body_len);
158                                    if header.is_resp() && header.seq().is_some() {
159                                        let resp_id = gen_resp_id(header.cmd_code(), header.seq().unwrap());
160                                        let _ = resp_waiter.set_result(resp_id, body);
161                                    } else {
162                                        if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
163                                            let version = header.version();
164                                            let seq = header.seq();
165                                            let cmd_code = header.cmd_code();
166                                            match {
167                                                let _handle_state = state_holder.new_state(tokio::task::id());
168                                                handler.handle(remote_id.clone(), tunnel_id, header, body).await
169                                            } {
170                                                Ok(Some(mut body)) => {
171                                                    let mut write = resp_write.get().await;
172                                                    let header = CmdHeader::<LEN, CMD>::new(version, true, seq, cmd_code, LEN::from_u64(body.len()).unwrap());
173                                                    let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
174                                                    if buf.len() > 255 {
175                                                        return Err(cmd_err!(CmdErrorCode::RawCodecError, "header len too large"));
176                                                    }
177                                                    write.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
178                                                    write.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
179                                                    tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
180                                                    write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
181                                                }
182                                                Err(e) => {
183                                                    log::error!("handle cmd error: {:?}", e);
184                                                }
185                                                _ => {}
186                                            }
187                                        }
188                                    }
189                                };
190                                reader = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
191                                // }
192                            }
193                            Ok(())
194                        }.await;
195                        ret
196                    });
197
198                    let peer_conn = PeerConnection {
199                        conn_id: tunnel_id,
200                        peer_id: peer_id.clone(),
201                        send: writer,
202                        handle: Some(recv_handle),
203                    };
204                    this.peer_manager.add_peer_connection(peer_conn).await;
205                    Ok(())
206                }.await;
207                if let Err(e) = ret {
208                    log::error!("peer connection error: {:?}", e);
209                }
210            });
211        }
212    }
213}
214
215#[async_trait::async_trait]
216impl<M: CmdTunnelMeta,
217    R: CmdTunnelRead<M>,
218    W: CmdTunnelWrite<M>,
219    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
220    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash + Debug,
221    LISTENER: CmdTunnelListener<M, R, W>> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER> {
222    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
223        self.cmd_handler_map.insert(cmd, handler);
224    }
225
226    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
227        let connections = self.peer_manager.find_connections(peer_id);
228        for conn in connections {
229            let ret: CmdResult<()> = async move {
230                log::trace!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
231                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
232                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
233                let mut send = conn.send.get().await;
234                if buf.len() > 255 {
235                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
236                }
237                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
238                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
239                send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
240                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
241                Ok(())
242            }.await;
243            if ret.is_ok() {
244                break;
245            }
246        }
247        Ok(())
248    }
249
250    async fn send_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
251        let connections = self.peer_manager.find_connections(peer_id);
252        for conn in connections {
253            if let Some(id) = tokio::task::try_id() {
254                if self.state_holder.has_state(id) {
255                    continue;
256                }
257            }
258            let ret: CmdResult<CmdBody> = async move {
259                log::trace!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
260                let seq = gen_seq();
261                let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
262                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
263                let resp_id = gen_resp_id(cmd, seq);
264                let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
265                {
266                    let mut send = conn.send.get().await;
267                    if buf.len() > 255 {
268                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
269                    }
270                    send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
271                    send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
272                    send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
273                    send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
274                }
275                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
276                Ok(body )
277            }.await;
278            if ret.is_ok() {
279                return ret;
280            } else {
281                println!("send err {:?}", ret.unwrap_err());
282            }
283        }
284        Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
285    }
286
287    async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
288        let connections = self.peer_manager.find_connections(peer_id);
289        for conn in connections {
290            let ret: CmdResult<()> = async move {
291                let mut len = 0;
292                for b in body.iter() {
293                    len += b.len();
294                    log::trace!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
295                }
296                log::trace!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
297                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
298                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
299                let mut send = conn.send.get().await;
300                if buf.len() > 255 {
301                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
302                }
303                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
304                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
305                for b in body.iter() {
306                    send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
307                }
308                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
309                Ok(())
310            }.await;
311            if ret.is_ok() {
312                break;
313            }
314        }
315        Ok(())
316    }
317
318    async fn send2_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
319        let connections = self.peer_manager.find_connections(peer_id);
320        for conn in connections {
321            if let Some(id) = tokio::task::try_id() {
322                if self.state_holder.has_state(id) {
323                    continue;
324                }
325            }
326            let ret: CmdResult<CmdBody> = async move {
327                let mut len = 0;
328                for b in body.iter() {
329                    len += b.len();
330                    log::trace!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
331                }
332                log::trace!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
333                let seq = gen_seq();
334                let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
335                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
336                let resp_id = gen_resp_id(cmd, seq);
337                let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
338                {
339                    let mut send = conn.send.get().await;
340                    if buf.len() > 255 {
341                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
342                    }
343                    send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
344                    send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
345                    for b in body.iter() {
346                        send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
347                    }
348                    send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
349                }
350                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
351                Ok(body)
352            }.await;
353            if ret.is_ok() {
354                return ret;
355            }
356        }
357        Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
358    }
359
360    async fn send_cmd(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
361        let body_data = body.into_bytes().await?;
362        let body = body_data.as_slice();
363        let connections = self.peer_manager.find_connections(peer_id);
364        for conn in connections {
365            let ret: CmdResult<()> = async move {
366                log::trace!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
367                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
368                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
369                let mut send = conn.send.get().await;
370                if buf.len() > 255 {
371                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
372                }
373                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
374                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
375                send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
376                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
377                Ok(())
378            }.await;
379            if ret.is_ok() {
380                break;
381            }
382        }
383        Ok(())
384    }
385
386    async fn send_cmd_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
387        let connections = self.peer_manager.find_connections(peer_id);
388        let body_data = body.into_bytes().await?;
389        let data_ref = body_data.as_slice();
390        for conn in connections {
391            if let Some(id) = tokio::task::try_id() {
392                if self.state_holder.has_state(id) {
393                    continue;
394                }
395            }
396            let ret: CmdResult<CmdBody> = async move {
397                log::trace!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, data_ref.len());
398                let seq = gen_seq();
399                let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(data_ref.len() as u64).unwrap());
400                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
401                let resp_id = gen_resp_id(cmd, seq);
402                let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
403                {
404                    let mut send = conn.send.get().await;
405                    if buf.len() > 255 {
406                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
407                    }
408                    send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
409                    send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
410                    send.write_all(data_ref).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
411                    send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
412                }
413                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
414                Ok(body )
415            }.await;
416            if ret.is_ok() {
417                return ret;
418            }
419        }
420        Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
421    }
422
423    async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
424        let conn = self.peer_manager.find_connection(tunnel_id);
425        if conn.is_none() {
426            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
427        }
428        let conn = conn.unwrap();
429        assert_eq!(tunnel_id, conn.conn_id);
430        log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
431        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
432        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
433        let mut send = conn.send.get().await;
434        if buf.len() > 255 {
435            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
436        }
437        send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
438        send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
439        send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
440        send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
441        Ok(())
442    }
443
444    async fn send_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
445        let conn = self.peer_manager.find_connection(tunnel_id);
446        if conn.is_none() {
447            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
448        }
449        let conn = conn.unwrap();
450        if let Some(id) = tokio::task::try_id() {
451            if self.state_holder.has_state(id) {
452                return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
453            }
454        }
455        assert_eq!(tunnel_id, conn.conn_id);
456        log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
457        let seq = gen_seq();
458        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
459        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
460        let resp_id = gen_resp_id(cmd, seq);
461        let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
462        {
463            let mut send = conn.send.get().await;
464            if buf.len() > 255 {
465                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
466            }
467            send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
468            send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
469            send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
470            send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
471        }
472        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
473        Ok(body)
474    }
475
476    async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
477        let conn = self.peer_manager.find_connection(tunnel_id);
478        if conn.is_none() {
479            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
480        }
481        let conn = conn.unwrap();
482        assert_eq!(tunnel_id, conn.conn_id);
483        let mut len = 0;
484        for b in body.iter() {
485            len += b.len();
486            log::trace!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
487        }
488        log::trace!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
489        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
490        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
491        if buf.len() > 255 {
492            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
493        }
494        let mut send = conn.send.get().await;
495        send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
496        send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
497        for b in body.iter() {
498            send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
499        }
500        send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
501        Ok(())
502    }
503
504    async fn send2_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
505        let conn = self.peer_manager.find_connection(tunnel_id);
506        if conn.is_none() {
507            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
508        }
509        let conn = conn.unwrap();
510        if let Some(id) = tokio::task::try_id() {
511            if self.state_holder.has_state(id) {
512                return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
513            }
514        }
515        assert_eq!(tunnel_id, conn.conn_id);
516        let mut len = 0;
517        for b in body.iter() {
518            len += b.len();
519            log::trace!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
520        }
521        log::trace!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
522        let seq = gen_seq();
523        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
524        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
525        let resp_id = gen_resp_id(cmd, seq);
526        let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
527        if buf.len() > 255 {
528            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
529        }
530        {
531            let mut send = conn.send.get().await;
532            send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
533            send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
534            for b in body.iter() {
535                send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
536            }
537            send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
538        }
539        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
540        Ok(body)
541    }
542
543    async fn send_cmd_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, mut body: CmdBody) -> CmdResult<()> {
544        let conn = self.peer_manager.find_connection(tunnel_id);
545        if conn.is_none() {
546            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
547        }
548        let conn = conn.unwrap();
549        assert_eq!(tunnel_id, conn.conn_id);
550        log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, body.len());
551        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len()).unwrap());
552        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
553        let mut send = conn.send.get().await;
554        if buf.len() > 255 {
555            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
556        }
557        send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
558        send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
559        tokio::io::copy(&mut body, send.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
560        send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
561        Ok(())
562    }
563
564    async fn send_cmd_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, mut body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
565        let conn = self.peer_manager.find_connection(tunnel_id);
566        if conn.is_none() {
567            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
568        }
569        let conn = conn.unwrap();
570        if let Some(id) = tokio::task::try_id() {
571            if self.state_holder.has_state(id) {
572                return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
573            }
574        }
575        assert_eq!(tunnel_id, conn.conn_id);
576        log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, body.len());
577        let seq = gen_seq();
578        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len()).unwrap());
579        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
580        let resp_id = gen_resp_id(cmd, seq);
581        let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
582        {
583            let mut send = conn.send.get().await;
584            if buf.len() > 255 {
585                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
586            }
587            send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
588            send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
589            tokio::io::copy(&mut body, send.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
590            send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
591        }
592        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
593        Ok(body)
594    }
595
596    async fn send_by_all_tunnels(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
597        let connections = self.peer_manager.find_connections(peer_id);
598        for conn in connections {
599            let _ret: CmdResult<()> = async move {
600                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
601                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
602                let mut send = conn.send.get().await;
603                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
604                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
605                send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
606                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
607                Ok(())
608            }.await;
609        }
610        Ok(())
611    }
612
613    async fn send2_by_all_tunnels(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
614        let connections = self.peer_manager.find_connections(peer_id);
615        let mut len = 0;
616        for b in body.iter() {
617            len += b.len();
618        }
619        for conn in connections {
620            let _ret: CmdResult<()> = async move {
621                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
622                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
623                if buf.len() > 255 {
624                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
625                }
626                let mut send = conn.send.get().await;
627                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
628                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
629                for b in body.iter() {
630                    send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
631                }
632                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
633                Ok(())
634            }.await;
635        }
636        Ok(())
637    }
638}