sfo_cmd_server/server/
server.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3use std::ops::DerefMut;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6use async_named_locker::{NamedStateHolder, ObjectHolder};
7use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
8use num::{FromPrimitive, ToPrimitive};
9use sfo_split::Splittable;
10use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
11use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
12use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
13use crate::client::{gen_resp_id, gen_seq, RespWaiter, RespWaiterRef};
14use crate::errors::{cmd_err, into_cmd_err, CmdErrorCode, CmdResult};
15use crate::peer_connection::PeerConnection;
16use crate::peer_id::PeerId;
17use crate::server::CmdServer;
18use super::peer_manager::{PeerManager, PeerManagerRef};
19
20#[async_trait::async_trait]
21pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>: Send + Sync + 'static {
22    async fn accept(&self) -> CmdResult<Splittable<R, W>>;
23}
24
25#[async_trait::async_trait]
26pub trait CmdServerEventListener: Send + Sync + 'static {
27    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
28    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
29}
30
31#[derive(Clone)]
32struct CmdServerEventListenerEmit {
33    listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
34}
35
36impl CmdServerEventListenerEmit {
37    pub fn new() -> Self {
38        Self {
39            listeners: Arc::new(Mutex::new(Vec::new())),
40        }
41    }
42
43    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
44        self.listeners.lock().unwrap().push(event_listener);
45    }
46}
47
48#[async_trait::async_trait]
49impl CmdServerEventListener for CmdServerEventListenerEmit {
50    async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
51        let listeners = {
52            self.listeners.lock().unwrap().clone()
53        };
54        for listener in listeners.iter() {
55            if let Err(e) = listener.on_peer_connected(peer_id).await {
56                log::error!("on_peer_connected error: {:?}", e);
57            }
58        }
59        Ok(())
60    }
61
62    async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
63        let listeners = {
64            self.listeners.lock().unwrap().clone()
65        };
66        for listener in listeners.iter() {
67            if let Err(e) = listener.on_peer_disconnected(peer_id).await {
68                log::error!("on_peer_disconnected error: {:?}", e);
69            }
70        }
71        Ok(())
72    }
73}
74
75pub struct DefaultCmdServer<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> {
76    tunnel_listener: LISTENER,
77    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
78    peer_manager: PeerManagerRef<M, R, W>,
79    event_emit: CmdServerEventListenerEmit,
80    resp_waiter: RespWaiterRef,
81    state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
82    _l: Mutex<std::marker::PhantomData<(R, W, LEN, CMD)>>,
83}
84
85impl<M: CmdTunnelMeta,
86    R: CmdTunnelRead<M>,
87    W: CmdTunnelWrite<M>,
88    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
89    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash + Debug,
90    LISTENER: CmdTunnelListener<M, R, W>> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER> {
91    pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
92        let event_emit = CmdServerEventListenerEmit::new();
93        Arc::new(Self {
94            tunnel_listener,
95            cmd_handler_map: Arc::new(CmdHandlerMap::new()),
96            peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
97            event_emit,
98            resp_waiter: Arc::new(RespWaiter::new()),
99            state_holder: NamedStateHolder::new(),
100            _l: Default::default(),
101        })
102    }
103
104    pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
105        self.event_emit.attach_event_listener(event_listener);
106    }
107
108    pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
109        let connections = self.peer_manager.find_connections(peer_id);
110        connections
111    }
112
113    pub fn start(self: &Arc<Self>) {
114        let this = self.clone();
115        tokio::spawn(async move {
116            if let Err(e) = this.run().await {
117                log::error!("cmd server error: {:?}", e);
118            }
119        });
120    }
121
122    async fn run(self: &Arc<Self>) -> CmdResult<()> {
123        loop {
124            let tunnel = self.tunnel_listener.accept().await?;
125            let peer_id = tunnel.get_remote_peer_id();
126            let tunnel_id = self.peer_manager.generate_conn_id();
127            let this = self.clone();
128            let resp_waiter = self.resp_waiter.clone();
129            let state_holder = self.state_holder.clone();
130            tokio::spawn(async move {
131                let remote_id = peer_id.clone();
132                let ret: CmdResult<()> = async move {
133                    let this = this.clone();
134                    let cmd_handler_map = this.cmd_handler_map.clone();
135                    let (mut reader, writer) = tunnel.split();
136                    let writer = ObjectHolder::new(writer);
137                    let resp_write = writer.clone();
138                    let resp_waiter = resp_waiter.clone();
139                    let state_holder = state_holder.clone();
140                    let recv_handle = tokio::spawn(async move {
141                        let ret: CmdResult<()> = async move {
142                            loop {
143                                let header_len = reader.read_u8().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
144                                let mut header = vec![0u8; header_len as usize];
145                                let n = reader.read_exact(&mut header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
146                                if n == 0 {
147                                    break;
148                                }
149                                let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
150                                sfo_log::debug!("recv cmd {:?}", header.cmd_code());
151                                let body_len = header.pkg_len().to_u64().unwrap();
152                                let cmd_read = CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
153                                let waiter = cmd_read.get_waiter();
154                                let future = waiter.create_result_future().map_err(into_cmd_err!(CmdErrorCode::Failed))?;
155                                {
156                                    let body_read = cmd_read;
157                                    let body = CmdBody::from_reader(BufReader::new(body_read), body_len);
158                                    if header.is_resp() && header.seq().is_some() {
159                                        let resp_id = gen_resp_id(header.cmd_code(), header.seq().unwrap());
160                                        let _ = resp_waiter.set_result(resp_id, body);
161                                    } else {
162                                        if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
163                                            let version = header.version();
164                                            let seq = header.seq();
165                                            let cmd_code = header.cmd_code();
166                                            match {
167                                                let _handle_state = state_holder.new_state(tokio::task::id());
168                                                handler.handle(remote_id.clone(), tunnel_id, header, body).await
169                                            } {
170                                                Ok(Some(mut body)) => {
171                                                    let mut write = resp_write.get().await;
172                                                    let header = CmdHeader::<LEN, CMD>::new(version, true, seq, cmd_code, LEN::from_u64(body.len()).unwrap());
173                                                    let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
174                                                    if buf.len() > 255 {
175                                                        return Err(cmd_err!(CmdErrorCode::RawCodecError, "header len too large"));
176                                                    }
177                                                    write.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
178                                                    write.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
179                                                    tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
180                                                    write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
181                                                }
182                                                Err(e) => {
183                                                    log::error!("handle cmd error: {:?}", e);
184                                                }
185                                                _ => {}
186                                            }
187                                        }
188                                    }
189                                };
190                                reader = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
191                                // }
192                            }
193                            Ok(())
194                        }.await;
195                        if ret.is_err() {
196                            log::error!("recv cmd error: {:?}", ret.as_ref().err().unwrap());
197                        }
198                        ret
199                    });
200
201                    let peer_conn = PeerConnection {
202                        conn_id: tunnel_id,
203                        peer_id: peer_id.clone(),
204                        send: writer,
205                        handle: Some(recv_handle),
206                    };
207                    this.peer_manager.add_peer_connection(peer_conn).await;
208                    Ok(())
209                }.await;
210                if let Err(e) = ret {
211                    log::error!("peer connection error: {:?}", e);
212                }
213            });
214        }
215    }
216}
217
218#[async_trait::async_trait]
219impl<M: CmdTunnelMeta,
220    R: CmdTunnelRead<M>,
221    W: CmdTunnelWrite<M>,
222    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
223    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash + Debug,
224    LISTENER: CmdTunnelListener<M, R, W>> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER> {
225    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
226        self.cmd_handler_map.insert(cmd, handler);
227    }
228
229    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
230        let connections = self.peer_manager.find_connections(peer_id);
231        for conn in connections {
232            let ret: CmdResult<()> = async move {
233                log::debug!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
234                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
235                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
236                let mut send = conn.send.get().await;
237                if buf.len() > 255 {
238                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
239                }
240                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
241                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
242                send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
243                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
244                Ok(())
245            }.await;
246            if ret.is_ok() {
247                break;
248            }
249        }
250        Ok(())
251    }
252
253    async fn send_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
254        let connections = self.peer_manager.find_connections(peer_id);
255        for conn in connections {
256            if let Some(id) = tokio::task::try_id() {
257                if self.state_holder.has_state(id) {
258                    continue;
259                }
260            }
261            let ret: CmdResult<CmdBody> = async move {
262                log::debug!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
263                let seq = gen_seq();
264                let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
265                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
266                let resp_id = gen_resp_id(cmd, seq);
267                let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
268                {
269                    let mut send = conn.send.get().await;
270                    if buf.len() > 255 {
271                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
272                    }
273                    send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
274                    send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
275                    send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
276                    send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
277                }
278                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
279                Ok(body )
280            }.await;
281            if ret.is_ok() {
282                return ret;
283            } else {
284                sfo_log::error!("send err {:?}", ret.unwrap_err());
285            }
286        }
287        Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
288    }
289
290    async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
291        let connections = self.peer_manager.find_connections(peer_id);
292        for conn in connections {
293            let ret: CmdResult<()> = async move {
294                let mut len = 0;
295                for b in body.iter() {
296                    len += b.len();
297                    log::debug!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
298                }
299                log::debug!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
300                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
301                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
302                let mut send = conn.send.get().await;
303                if buf.len() > 255 {
304                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
305                }
306                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
307                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
308                for b in body.iter() {
309                    send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
310                }
311                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
312                Ok(())
313            }.await;
314            if ret.is_ok() {
315                break;
316            }
317        }
318        Ok(())
319    }
320
321    async fn send2_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
322        let connections = self.peer_manager.find_connections(peer_id);
323        for conn in connections {
324            if let Some(id) = tokio::task::try_id() {
325                if self.state_holder.has_state(id) {
326                    continue;
327                }
328            }
329            let ret: CmdResult<CmdBody> = async move {
330                let mut len = 0;
331                for b in body.iter() {
332                    len += b.len();
333                    log::debug!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
334                }
335                log::debug!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
336                let seq = gen_seq();
337                let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
338                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
339                let resp_id = gen_resp_id(cmd, seq);
340                let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
341                {
342                    let mut send = conn.send.get().await;
343                    if buf.len() > 255 {
344                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
345                    }
346                    send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
347                    send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
348                    for b in body.iter() {
349                        send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
350                    }
351                    send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
352                }
353                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
354                Ok(body)
355            }.await;
356            if ret.is_ok() {
357                return ret;
358            }
359        }
360        Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
361    }
362
363    async fn send_cmd(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
364        let body_data = body.into_bytes().await?;
365        let body = body_data.as_slice();
366        let connections = self.peer_manager.find_connections(peer_id);
367        for conn in connections {
368            let ret: CmdResult<()> = async move {
369                log::debug!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
370                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
371                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
372                let mut send = conn.send.get().await;
373                if buf.len() > 255 {
374                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
375                }
376                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
377                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
378                send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
379                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
380                Ok(())
381            }.await;
382            if ret.is_ok() {
383                break;
384            }
385        }
386        Ok(())
387    }
388
389    async fn send_cmd_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
390        let connections = self.peer_manager.find_connections(peer_id);
391        let body_data = body.into_bytes().await?;
392        let data_ref = body_data.as_slice();
393        for conn in connections {
394            if let Some(id) = tokio::task::try_id() {
395                if self.state_holder.has_state(id) {
396                    continue;
397                }
398            }
399            let ret: CmdResult<CmdBody> = async move {
400                log::debug!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, data_ref.len());
401                let seq = gen_seq();
402                let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(data_ref.len() as u64).unwrap());
403                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
404                let resp_id = gen_resp_id(cmd, seq);
405                let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
406                {
407                    let mut send = conn.send.get().await;
408                    if buf.len() > 255 {
409                        return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
410                    }
411                    send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
412                    send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
413                    send.write_all(data_ref).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
414                    send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
415                }
416                let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
417                Ok(body )
418            }.await;
419            if ret.is_ok() {
420                return ret;
421            }
422        }
423        Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
424    }
425
426    async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
427        let conn = self.peer_manager.find_connection(tunnel_id);
428        if conn.is_none() {
429            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
430        }
431        let conn = conn.unwrap();
432        assert_eq!(tunnel_id, conn.conn_id);
433        log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
434        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
435        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
436        let mut send = conn.send.get().await;
437        if buf.len() > 255 {
438            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
439        }
440        send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
441        send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
442        send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
443        send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
444        Ok(())
445    }
446
447    async fn send_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
448        let conn = self.peer_manager.find_connection(tunnel_id);
449        if conn.is_none() {
450            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
451        }
452        let conn = conn.unwrap();
453        if let Some(id) = tokio::task::try_id() {
454            if self.state_holder.has_state(id) {
455                return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
456            }
457        }
458        assert_eq!(tunnel_id, conn.conn_id);
459        log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
460        let seq = gen_seq();
461        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
462        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
463        let resp_id = gen_resp_id(cmd, seq);
464        let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
465        {
466            let mut send = conn.send.get().await;
467            if buf.len() > 255 {
468                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
469            }
470            send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
471            send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
472            send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
473            send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
474        }
475        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
476        Ok(body)
477    }
478
479    async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
480        let conn = self.peer_manager.find_connection(tunnel_id);
481        if conn.is_none() {
482            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
483        }
484        let conn = conn.unwrap();
485        assert_eq!(tunnel_id, conn.conn_id);
486        let mut len = 0;
487        for b in body.iter() {
488            len += b.len();
489            log::debug!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
490        }
491        log::debug!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
492        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
493        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
494        if buf.len() > 255 {
495            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
496        }
497        let mut send = conn.send.get().await;
498        send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
499        send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
500        for b in body.iter() {
501            send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
502        }
503        send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
504        Ok(())
505    }
506
507    async fn send2_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
508        let conn = self.peer_manager.find_connection(tunnel_id);
509        if conn.is_none() {
510            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
511        }
512        let conn = conn.unwrap();
513        if let Some(id) = tokio::task::try_id() {
514            if self.state_holder.has_state(id) {
515                return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
516            }
517        }
518        assert_eq!(tunnel_id, conn.conn_id);
519        let mut len = 0;
520        for b in body.iter() {
521            len += b.len();
522            log::debug!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
523        }
524        log::debug!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
525        let seq = gen_seq();
526        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
527        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
528        let resp_id = gen_resp_id(cmd, seq);
529        let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
530        if buf.len() > 255 {
531            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
532        }
533        {
534            let mut send = conn.send.get().await;
535            send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
536            send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
537            for b in body.iter() {
538                send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
539            }
540            send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
541        }
542        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
543        Ok(body)
544    }
545
546    async fn send_cmd_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, mut body: CmdBody) -> CmdResult<()> {
547        let conn = self.peer_manager.find_connection(tunnel_id);
548        if conn.is_none() {
549            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
550        }
551        let conn = conn.unwrap();
552        assert_eq!(tunnel_id, conn.conn_id);
553        log::debug!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, body.len());
554        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len()).unwrap());
555        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
556        let mut send = conn.send.get().await;
557        if buf.len() > 255 {
558            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
559        }
560        send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
561        send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
562        tokio::io::copy(&mut body, send.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
563        send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
564        Ok(())
565    }
566
567    async fn send_cmd_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, mut body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
568        let conn = self.peer_manager.find_connection(tunnel_id);
569        if conn.is_none() {
570            return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
571        }
572        let conn = conn.unwrap();
573        if let Some(id) = tokio::task::try_id() {
574            if self.state_holder.has_state(id) {
575                return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
576            }
577        }
578        assert_eq!(tunnel_id, conn.conn_id);
579        log::debug!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, body.len());
580        let seq = gen_seq();
581        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len()).unwrap());
582        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
583        let resp_id = gen_resp_id(cmd, seq);
584        let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
585        {
586            let mut send = conn.send.get().await;
587            if buf.len() > 255 {
588                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
589            }
590            send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
591            send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
592            tokio::io::copy(&mut body, send.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
593            send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
594        }
595        let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
596        Ok(body)
597    }
598
599    async fn send_by_all_tunnels(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
600        let connections = self.peer_manager.find_connections(peer_id);
601        for conn in connections {
602            let _ret: CmdResult<()> = async move {
603                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
604                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
605                let mut send = conn.send.get().await;
606                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
607                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
608                send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
609                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
610                Ok(())
611            }.await;
612        }
613        Ok(())
614    }
615
616    async fn send2_by_all_tunnels(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
617        let connections = self.peer_manager.find_connections(peer_id);
618        let mut len = 0;
619        for b in body.iter() {
620            len += b.len();
621        }
622        for conn in connections {
623            let _ret: CmdResult<()> = async move {
624                let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
625                let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
626                if buf.len() > 255 {
627                    return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
628                }
629                let mut send = conn.send.get().await;
630                send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
631                send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
632                for b in body.iter() {
633                    send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
634                }
635                send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
636                Ok(())
637            }.await;
638        }
639        Ok(())
640    }
641}