sfo_cmd_server/client/
client.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3use std::marker::PhantomData;
4use std::ops::{Deref, DerefMut};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use async_named_locker::ObjectHolder;
8use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
9use num::{FromPrimitive, ToPrimitive};
10use sfo_pool::{into_pool_err, pool_err, ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool, ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification};
11use sfo_split::{Splittable, WHalf};
12use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
13use tokio::spawn;
14use tokio::task::JoinHandle;
15use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId, TunnelIdGenerator};
16use crate::client::{gen_resp_id, gen_seq, CmdClient, CmdSend, RespWaiter, RespWaiterRef, SendGuard};
17use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
18use crate::errors::{cmd_err, into_cmd_err, CmdErrorCode, CmdResult};
19use crate::peer_id::PeerId;
20
21#[async_trait::async_trait]
22pub trait CmdTunnelFactory<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>: Send + Sync + 'static {
23    async fn create_tunnel(&self) -> CmdResult<Splittable<R, W>>;
24}
25
26pub struct CommonCmdSend<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD>
27where LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
28      CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
29    pub(crate) recv_handle: JoinHandle<CmdResult<()>>,
30    pub(crate) write: ObjectHolder<WHalf<R, W>>,
31    pub(crate) is_work: bool,
32    pub(crate) tunnel_id: TunnelId,
33    pub(crate) remote_id: PeerId,
34    pub(crate) resp_waiter: RespWaiterRef,
35    pub(crate) tunnel_meta: Option<Arc<M>>,
36    _p: std::marker::PhantomData<(LEN, CMD)>,
37
38}
39
40// impl<R, W, LEN, CMD> Deref for CmdSend<R, W, LEN, CMD>
41// where R: CmdTunnelRead,
42//       W: CmdTunnelWrite,
43//       LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
44//       CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
45//     type Target = W;
46//
47//     fn deref(&self) -> &Self::Target {
48//         self.write.deref()
49//     }
50// }
51
52impl<M, R, W, LEN, CMD> CommonCmdSend<M, R, W, LEN, CMD>
53where M: CmdTunnelMeta,
54      R: CmdTunnelRead<M>,
55      W: CmdTunnelWrite<M>,
56      LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
57      CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
58    pub fn new(tunnel_id: TunnelId,
59               recv_handle: JoinHandle<CmdResult<()>>,
60               write: ObjectHolder<WHalf<R, W>>,
61               resp_waiter: RespWaiterRef,
62               remote_id: PeerId,
63               tunnel_meta: Option<Arc<M>>) -> Self {
64        Self {
65            recv_handle,
66            write,
67            is_work: true,
68            tunnel_id,
69            remote_id,
70            resp_waiter,
71            tunnel_meta,
72            _p: Default::default(),
73        }
74    }
75
76    pub fn get_tunnel_id(&self) -> TunnelId {
77        self.tunnel_id
78    }
79
80    pub fn set_disable(&mut self) {
81        self.is_work = false;
82        self.recv_handle.abort();
83    }
84
85    pub async fn send(&mut self, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
86        log::trace!("client {:?} send cmd: {:?}, len: {} data:{}", self.tunnel_id, cmd, body.len(), hex::encode(body));
87        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
88        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
89        let ret = self.send_inner(buf.as_slice(), body).await;
90        if let Err(e) = ret {
91            self.set_disable();
92            return Err(e);
93        }
94        Ok(())
95    }
96
97    pub async fn send_with_resp(&mut self, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
98        if let Some(id) = tokio::task::try_id() {
99            if id == self.recv_handle.id() {
100                return Err(cmd_err!(CmdErrorCode::Failed, "can't send with resp in recv task"));
101            }
102        }
103        log::trace!("client {:?} send cmd: {:?}, len: {}, data: {}", self.tunnel_id, cmd, body.len(), hex::encode(body));
104        let seq = gen_seq();
105        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
106        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
107        let resp_id = gen_resp_id(cmd, seq);
108        let waiter = self.resp_waiter.clone();
109        let resp_waiter = waiter.create_timeout_result_future(resp_id, timeout)
110            .map_err(into_cmd_err!(CmdErrorCode::Failed, "create timeout result future error"))?;
111        let ret = self.send_inner(buf.as_slice(), body).await;
112        if let Err(e) = ret {
113            self.set_disable();
114            return Err(e);
115        }
116        let resp = resp_waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "recv resp error"))?;
117        Ok(resp)
118    }
119
120    pub async fn send2(&mut self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
121        let mut len = 0;
122        for b in body.iter() {
123            len += b.len();
124            log::trace!("client {:?} send2 cmd: {:?}, data {}", self.tunnel_id, cmd, hex::encode(b));
125        }
126        log::trace!("client {:?} send2 cmd: {:?}, len {}", self.tunnel_id, cmd, len);
127        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
128        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
129        let ret = self.send_inner2(buf.as_slice(), body).await;
130        if let Err(e) = ret {
131            self.set_disable();
132            return Err(e);
133        }
134        Ok(())
135    }
136
137    pub async fn send2_with_resp(&mut self, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
138        if let Some(id) = tokio::task::try_id() {
139            if id == self.recv_handle.id() {
140                return Err(cmd_err!(CmdErrorCode::Failed, "can't send with resp in recv task"));
141            }
142        }
143        let mut len = 0;
144        for b in body.iter() {
145            len += b.len();
146            log::trace!("client {:?} send2 cmd {:?} body: {}", self.tunnel_id, cmd, hex::encode(b));
147        }
148        log::trace!("client {:?} send2 cmd: {:?}, len {}", self.tunnel_id, cmd, len);
149        let seq = gen_seq();
150        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
151        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
152        let resp_id = gen_resp_id(cmd, seq);
153        let waiter = self.resp_waiter.clone();
154        let resp_waiter = waiter.create_timeout_result_future(resp_id, timeout)
155            .map_err(into_cmd_err!(CmdErrorCode::Failed, "create timeout result future error"))?;
156        let ret = self.send_inner2(buf.as_slice(), body).await;
157        if let Err(e) = ret {
158            self.set_disable();
159            return Err(e);
160        }
161        let resp = resp_waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "recv resp error"))?;
162        Ok(resp)
163    }
164
165    pub async fn send_cmd(&mut self, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
166        log::trace!("client {:?} send cmd: {:?}, len: {}", self.tunnel_id, cmd, body.len());
167        let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len()).unwrap());
168        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
169        let ret = self.send_inner_cmd(buf.as_slice(), body).await;
170        if let Err(e) = ret {
171            self.set_disable();
172            return Err(e);
173        }
174        Ok(())
175    }
176
177    pub async fn send_cmd_with_resp(&mut self, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
178        if let Some(id) = tokio::task::try_id() {
179            if id == self.recv_handle.id() {
180                return Err(cmd_err!(CmdErrorCode::Failed, "can't send with resp in recv task"));
181            }
182        }
183        log::trace!("client {:?} send cmd: {:?}, len: {}", self.tunnel_id, cmd, body.len());
184        let seq = gen_seq();
185        let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len()).unwrap());
186        let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
187        let resp_id = gen_resp_id(cmd, seq);
188        let waiter = self.resp_waiter.clone();
189        let resp_waiter = waiter.create_timeout_result_future(resp_id, timeout)
190            .map_err(into_cmd_err!(CmdErrorCode::Failed, "create timeout result future error"))?;
191        let ret = self.send_inner_cmd(buf.as_slice(), body).await;
192        if let Err(e) = ret {
193            self.set_disable();
194            return Err(e);
195        }
196        let resp = resp_waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "recv resp error"))?;
197        Ok(resp)
198    }
199
200    async fn send_inner(&mut self, header: &[u8], body: &[u8]) -> CmdResult<()> {
201        let mut write = self.write.get().await;
202        if header.len() > 255 {
203            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too long"));
204        }
205        write.write_u8(header.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
206        write.write_all(header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
207        write.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
208        write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
209        Ok(())
210    }
211
212    async fn send_inner2(&mut self, header: &[u8], body: &[&[u8]]) -> CmdResult<()> {
213        let mut write = self.write.get().await;
214        if header.len() > 255 {
215            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too long"));
216        }
217        write.write_u8(header.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
218        write.write_all(header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
219        for b in body.iter() {
220            write.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
221        }
222        write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
223        Ok(())
224    }
225
226    async fn send_inner_cmd(&mut self, header: &[u8], mut body: CmdBody) -> CmdResult<()> {
227        let mut write = self.write.get().await;
228        if header.len() > 255 {
229            return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
230        }
231        write.write_u8(header.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
232        write.write_all(header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
233        tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
234        write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
235        Ok(())
236    }
237}
238
239impl<M, R, W, LEN, CMD> Drop for CommonCmdSend<M, R, W, LEN, CMD>
240where M: CmdTunnelMeta,
241      R: CmdTunnelRead<M>,
242      W: CmdTunnelWrite<M>,
243      LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
244      CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
245    fn drop(&mut self) {
246        self.set_disable();
247    }
248}
249
250impl<M, R, W, LEN, CMD> CmdSend<M> for CommonCmdSend<M, R, W, LEN, CMD>
251where M: CmdTunnelMeta,
252      R: CmdTunnelRead<M>,
253      W: CmdTunnelWrite<M>,
254      LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
255      CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
256    fn get_tunnel_meta(&self) -> Option<Arc<M>> {
257        self.tunnel_meta.clone()
258    }
259
260    fn get_remote_peer_id(&self) -> PeerId {
261        self.remote_id.clone()
262    }
263}
264
265impl<M, R, W, LEN, CMD> ClassifiedWorker<TunnelId> for CommonCmdSend<M, R, W, LEN, CMD>
266where M: CmdTunnelMeta,
267      R: CmdTunnelRead<M>,
268      W: CmdTunnelWrite<M>,
269      LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
270      CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
271    fn is_work(&self) -> bool {
272        self.is_work && !self.recv_handle.is_finished()
273    }
274
275    fn is_valid(&self, c: TunnelId) -> bool {
276        self.tunnel_id == c
277    }
278
279    fn classification(&self) -> TunnelId {
280        self.tunnel_id
281    }
282}
283
284pub struct ClassifiedSendGuard<
285    C: WorkerClassification,
286    M: CmdTunnelMeta,
287    CW: ClassifiedWorker<C> + CmdSend<M>,
288    F: ClassifiedWorkerFactory<C, CW>> {
289    pub(crate) worker_guard: ClassifiedWorkerGuard<C, CW, F>,
290    pub(crate) _p: PhantomData<M>,
291}
292
293impl<
294    C: WorkerClassification,
295    M: CmdTunnelMeta,
296    CW: ClassifiedWorker<C> + CmdSend<M>,
297    F: ClassifiedWorkerFactory<C, CW>> Deref for ClassifiedSendGuard<C, M, CW, F> {
298    type Target = CW;
299
300    fn deref(&self) -> &Self::Target {
301        &self.worker_guard.deref()
302    }
303}
304
305impl<
306    C: WorkerClassification,
307    M: CmdTunnelMeta,
308    CW: ClassifiedWorker<C> + CmdSend<M>,
309    F: ClassifiedWorkerFactory<C, CW>> SendGuard<M, CW> for ClassifiedSendGuard<C, M, CW, F> {
310}
311
312pub struct CmdWriteFactory<M: CmdTunnelMeta,
313    R: CmdTunnelRead<M>,
314    W: CmdTunnelWrite<M>,
315    F: CmdTunnelFactory<M, R, W>,
316    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
317    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug> {
318    tunnel_factory: F,
319    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
320    resp_waiter: RespWaiterRef,
321    tunnel_id_generator: TunnelIdGenerator,
322    p: std::marker::PhantomData<Mutex<(R, W, M)>>,
323}
324
325impl<M: CmdTunnelMeta,
326    R: CmdTunnelRead<M>,
327    W: CmdTunnelWrite<M>,
328    F: CmdTunnelFactory<M, R, W>,
329    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
330    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug> CmdWriteFactory<M, R, W, F, LEN, CMD> {
331    pub(crate) fn new(tunnel_factory: F,
332                      cmd_handler: impl CmdHandler<LEN, CMD>,
333                      resp_waiter: RespWaiterRef,) -> Self {
334        Self {
335            tunnel_factory,
336            cmd_handler: Arc::new(cmd_handler),
337            resp_waiter,
338            tunnel_id_generator: TunnelIdGenerator::new(),
339            p: Default::default(),
340        }
341    }
342}
343
344#[async_trait::async_trait]
345impl<M: CmdTunnelMeta,
346    R: CmdTunnelRead<M>,
347    W: CmdTunnelWrite<M>,
348    F: CmdTunnelFactory<M, R, W>,
349    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
350    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug> ClassifiedWorkerFactory<TunnelId, CommonCmdSend<M, R, W, LEN, CMD>> for CmdWriteFactory<M, R, W, F, LEN, CMD> {
351    async fn create(&self, c: Option<TunnelId>) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
352        if c.is_some() {
353            return Err(pool_err!(PoolErrorCode::Failed, "tunnel {:?} not found", c.unwrap()));
354        }
355        let tunnel = self.tunnel_factory.create_tunnel().await.map_err(into_pool_err!(PoolErrorCode::Failed))?;
356        let peer_id = tunnel.get_remote_peer_id();
357        let tunnel_id = self.tunnel_id_generator.generate();
358        let (mut recv, write) = tunnel.split();
359        let remote_id = write.get_remote_peer_id();
360        let meta = write.get_tunnel_meta();
361        let write = ObjectHolder::new(write);
362        let resp_write = write.clone();
363        let cmd_handler = self.cmd_handler.clone();
364        let handle = spawn(async move {
365            let ret: CmdResult<()> = async move {
366                loop {
367                    let header_len = recv.read_u8().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
368                    let mut header = vec![0u8; header_len as usize];
369                    let n = recv.read_exact(header.as_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
370                    if n == 0 {
371                        break;
372                    }
373                    let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
374                    log::trace!("recv cmd {:?} from {} len {}", header.cmd_code(), peer_id.to_base58(), header.pkg_len().to_u64().unwrap());
375                    let body_len = header.pkg_len().to_u64().unwrap();
376                    let cmd_read = CmdBodyRead::new(recv, header.pkg_len().to_u64().unwrap() as usize);
377                    let waiter = cmd_read.get_waiter();
378                    let future = waiter.create_result_future().map_err(into_cmd_err!(CmdErrorCode::Failed))?;
379                    let version = header.version();
380                    let seq = header.seq();
381                    let cmd_code = header.cmd_code();
382                    match cmd_handler.handle(peer_id.clone(), tunnel_id, header, CmdBody::from_reader(BufReader::new(cmd_read), body_len)).await {
383                        Ok(Some(mut body)) => {
384                            let mut write = resp_write.get().await;
385                            let header = CmdHeader::<LEN, CMD>::new(version, true, seq, cmd_code, LEN::from_u64(body.len()).unwrap());
386                            let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
387                            if buf.len() > 255 {
388                                return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too long"));
389                            }
390                            write.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
391                            write.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
392                            tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
393                            write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
394                        }
395                        Ok(None) => {
396
397                        }
398                        Err(e) => {
399                            log::error!("handle cmd error: {:?}", e);
400                        }
401                    }
402                    recv = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
403                }
404                Ok(())
405            }.await;
406            ret
407        });
408        Ok(CommonCmdSend::new(tunnel_id, handle, write, self.resp_waiter.clone(), remote_id, meta))
409    }
410}
411
412pub struct DefaultCmdClient<M: CmdTunnelMeta,
413    R: CmdTunnelRead<M>,
414    W: CmdTunnelWrite<M>,
415    F: CmdTunnelFactory<M, R, W>,
416    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
417    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Eq + Hash + Debug> {
418    tunnel_pool: ClassifiedWorkerPoolRef<TunnelId, CommonCmdSend<M, R, W, LEN, CMD>, CmdWriteFactory<M, R, W, F, LEN, CMD>>,
419    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
420}
421
422impl<M: CmdTunnelMeta,
423    R: CmdTunnelRead<M>,
424    W: CmdTunnelWrite<M>,
425    F: CmdTunnelFactory<M, R, W>,
426    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
427    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Eq + Hash + Debug> DefaultCmdClient<M, R, W, F, LEN, CMD> {
428    pub fn new(factory: F, tunnel_count: u16) -> Arc<Self> {
429        let cmd_handler_map = Arc::new(CmdHandlerMap::new());
430        let handler_map = cmd_handler_map.clone();
431        let resp_waiter = Arc::new(RespWaiter::new());
432        let waiter = resp_waiter.clone();
433        Arc::new(Self {
434            tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, CmdWriteFactory::<M, R, W, _, LEN, CMD>::new(factory, move |peer_id: PeerId, tunnel_id: TunnelId, header: CmdHeader<LEN, CMD>, body_read: CmdBody| {
435                let handler_map = handler_map.clone();
436                let waiter = waiter.clone();
437                async move {
438                    if header.is_resp() && header.seq().is_some() {
439                        let resp_id = gen_resp_id(header.cmd_code(), header.seq().unwrap());
440                        let _ = waiter.set_result(resp_id, body_read);
441                        Ok(None)
442                    } else {
443                        if let Some(handler) = handler_map.get(header.cmd_code()) {
444                            handler.handle(peer_id, tunnel_id, header, body_read).await
445                        } else {
446                            Ok(None)
447                        }
448                    }
449                }
450            }, resp_waiter.clone())),
451            cmd_handler_map,
452        })
453    }
454
455    async fn get_send(&self) -> CmdResult<ClassifiedWorkerGuard<TunnelId, CommonCmdSend<M, R, W, LEN, CMD>, CmdWriteFactory<M, R, W, F, LEN, CMD>>> {
456        self.tunnel_pool.get_worker().await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
457    }
458
459    async fn get_send_of_tunnel_id(&self, tunnel_id: TunnelId) -> CmdResult<ClassifiedWorkerGuard<TunnelId, CommonCmdSend<M, R, W, LEN, CMD>, CmdWriteFactory<M, R, W, F, LEN, CMD>>> {
460        self.tunnel_pool.get_classified_worker(tunnel_id).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
461    }
462}
463
464pub type CmdClientSendGuard<M, R, W, F, LEN, CMD> = ClassifiedSendGuard<TunnelId, M, CommonCmdSend<M, R, W, LEN, CMD>, CmdWriteFactory<M, R, W, F, LEN, CMD>>;
465#[async_trait::async_trait]
466impl<M: CmdTunnelMeta,
467    R: CmdTunnelRead<M>,
468    W: CmdTunnelWrite<M>,
469    F: CmdTunnelFactory<M, R, W>,
470    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
471    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Eq + Hash + Debug,
472    > CmdClient<LEN, CMD, M, CommonCmdSend<M, R, W, LEN, CMD>, CmdClientSendGuard<M, R, W, F, LEN, CMD>, > for DefaultCmdClient<M, R, W, F, LEN, CMD> {
473    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
474        self.cmd_handler_map.insert(cmd, handler);
475    }
476
477    async fn send(&self, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
478        let mut send = self.get_send().await?;
479        send.send(cmd, version, body).await
480    }
481
482    async fn send_with_resp(&self, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
483        let mut send = self.get_send().await?;
484        send.send_with_resp(cmd, version, body, timeout).await
485    }
486
487    async fn send2(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
488        let mut send = self.get_send().await?;
489        send.send2(cmd, version, body).await
490    }
491
492    async fn send2_with_resp(&self, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
493        let mut send = self.get_send().await?;
494        send.send2_with_resp(cmd, version, body, timeout).await
495    }
496
497    async fn send_cmd(&self, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
498        let mut send = self.get_send().await?;
499        send.send_cmd(cmd, version, body).await
500    }
501
502    async fn send_cmd_with_resp(&self, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
503        let mut send = self.get_send().await?;
504        send.send_cmd_with_resp(cmd, version, body, timeout).await
505    }
506
507    async fn send_by_specify_tunnel(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
508        let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
509        send.send(cmd, version, body).await
510    }
511
512    async fn send_by_specify_tunnel_with_resp(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
513        let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
514        send.send_with_resp(cmd, version, body, timeout).await
515    }
516
517    async fn send2_by_specify_tunnel(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
518        let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
519        send.send2(cmd, version, body).await
520    }
521
522    async fn send2_by_specify_tunnel_with_resp(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
523        let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
524        send.send2_with_resp(cmd, version, body, timeout).await
525    }
526
527    async fn send_cmd_by_specify_tunnel(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
528        let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
529        send.send_cmd(cmd, version, body).await
530    }
531
532    async fn send_cmd_by_specify_tunnel_with_resp(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
533        let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
534        send.send_cmd_with_resp(cmd, version, body, timeout).await
535    }
536
537    async fn clear_all_tunnel(&self) {
538        self.tunnel_pool.clear_all_worker().await;
539    }
540
541    async fn get_send(&self, tunnel_id: TunnelId) -> CmdResult<CmdClientSendGuard<M, R, W, F, LEN, CMD>> {
542        Ok(ClassifiedSendGuard {
543            worker_guard: self.get_send_of_tunnel_id(tunnel_id).await?,
544            _p: Default::default(),
545        })
546    }
547}