Skip to main content

sfo_cmd_server/node/
node.rs

1use crate::client::{
2    CommonCmdSend, RespWaiter, RespWaiterRef, TrackedSendGuard, TunnelReserveResult,
3    TunnelRuntimeRegistry, gen_resp_id,
4};
5use crate::cmd::CmdHandlerMap;
6use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
7use crate::node::create_recv_handle;
8use crate::server::CmdTunnelListener;
9use crate::{
10    CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId,
11    TunnelId, TunnelIdGenerator, into_pool_err, pool_err,
12};
13use async_named_locker::ObjectHolder;
14use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
15use num::{FromPrimitive, ToPrimitive};
16use sfo_pool::{
17    ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
18    ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult,
19};
20use sfo_split::Splittable;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26use tokio::task::yield_now;
27
28#[async_trait::async_trait]
29pub trait CmdNodeTunnelFactory<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
30    Send + Sync + 'static
31{
32    async fn create_tunnel(&self, remote_id: &PeerId) -> CmdResult<Splittable<R, W>>;
33}
34
35impl<M, R, W, LEN, CMD> ClassifiedWorker<(PeerId, Option<TunnelId>)>
36    for CommonCmdSend<M, R, W, LEN, CMD>
37where
38    M: CmdTunnelMeta,
39    R: CmdTunnelRead<M>,
40    W: CmdTunnelWrite<M>,
41    LEN: RawEncode
42        + for<'a> RawDecode<'a>
43        + Copy
44        + Send
45        + Sync
46        + 'static
47        + FromPrimitive
48        + ToPrimitive,
49    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
50{
51    fn is_work(&self) -> bool {
52        self.is_work && !self.recv_handle.is_finished()
53    }
54
55    fn is_valid(&self, c: (PeerId, Option<TunnelId>)) -> bool {
56        let (peer_id, tunnel_id) = c;
57        if tunnel_id.is_some() {
58            self.tunnel_id == tunnel_id.unwrap() && peer_id == self.remote_id
59        } else {
60            peer_id == self.remote_id
61        }
62    }
63
64    fn classification(&self) -> (PeerId, Option<TunnelId>) {
65        (self.remote_id.clone(), None)
66    }
67}
68
69struct CmdWriteFactoryImpl<
70    M: CmdTunnelMeta,
71    R: CmdTunnelRead<M>,
72    W: CmdTunnelWrite<M>,
73    F: CmdNodeTunnelFactory<M, R, W>,
74    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
75    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
76    LISTENER: CmdTunnelListener<M, R, W>,
77> {
78    tunnel_listener: LISTENER,
79    tunnel_factory: F,
80    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
81    tunnel_id_generator: TunnelIdGenerator,
82    resp_waiter: RespWaiterRef,
83    send_cache: Arc<Mutex<HashMap<PeerId, Vec<CommonCmdSend<M, R, W, LEN, CMD>>>>>,
84}
85
86impl<
87    M: CmdTunnelMeta,
88    R: CmdTunnelRead<M>,
89    W: CmdTunnelWrite<M>,
90    F: CmdNodeTunnelFactory<M, R, W>,
91    LEN: RawEncode
92        + for<'a> RawDecode<'a>
93        + Copy
94        + Send
95        + Sync
96        + 'static
97        + FromPrimitive
98        + ToPrimitive
99        + RawFixedBytes,
100    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
101    LISTENER: CmdTunnelListener<M, R, W>,
102> CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
103{
104    pub fn new(
105        tunnel_factory: F,
106        tunnel_listener: LISTENER,
107        cmd_handler: impl CmdHandler<LEN, CMD>,
108        resp_waiter: RespWaiterRef,
109    ) -> Self {
110        Self {
111            tunnel_listener,
112            tunnel_factory,
113            cmd_handler: Arc::new(cmd_handler),
114            tunnel_id_generator: TunnelIdGenerator::new(),
115            resp_waiter,
116            send_cache: Arc::new(Mutex::new(Default::default())),
117        }
118    }
119
120    pub fn start(self: &Arc<Self>) {
121        let this = self.clone();
122        tokio::spawn(async move {
123            if let Err(e) = this.run().await {
124                log::error!("cmd server error: {:?}", e);
125            }
126        });
127    }
128
129    async fn run(self: &Arc<Self>) -> CmdResult<()> {
130        loop {
131            let tunnel = self.tunnel_listener.accept().await?;
132            let peer_id = tunnel.get_remote_peer_id();
133            let tunnel_id = self.tunnel_id_generator.generate();
134            let resp_waiter = self.resp_waiter.clone();
135            let this = self.clone();
136            tokio::spawn(async move {
137                let ret: CmdResult<()> = async move {
138                    let this = this.clone();
139                    let cmd_handler = this.cmd_handler.clone();
140                    let (reader, writer) = tunnel.split();
141                    let remote_id = reader.get_remote_peer_id();
142                    let tunnel_meta = reader.get_tunnel_meta();
143                    let writer = ObjectHolder::new(writer);
144                    let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
145                        reader,
146                        writer.clone(),
147                        tunnel_id,
148                        cmd_handler,
149                    );
150                    {
151                        let mut send_cache = this.send_cache.lock().unwrap();
152                        let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
153                        send_list.push(CommonCmdSend::new(
154                            tunnel_id,
155                            recv_handle,
156                            writer,
157                            resp_waiter,
158                            remote_id,
159                            tunnel_meta,
160                        ));
161                    }
162                    Ok(())
163                }
164                .await;
165                if let Err(e) = ret {
166                    log::error!("peer connection error: {:?}", e);
167                }
168            });
169        }
170    }
171}
172
173#[async_trait::async_trait]
174impl<
175    M: CmdTunnelMeta,
176    R: CmdTunnelRead<M>,
177    W: CmdTunnelWrite<M>,
178    F: CmdNodeTunnelFactory<M, R, W>,
179    LEN: RawEncode
180        + for<'a> RawDecode<'a>
181        + Copy
182        + Send
183        + Sync
184        + 'static
185        + FromPrimitive
186        + ToPrimitive
187        + RawFixedBytes,
188    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
189    LISTENER: CmdTunnelListener<M, R, W>,
190> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
191    for CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
192{
193    async fn create(
194        &self,
195        c: Option<(PeerId, Option<TunnelId>)>,
196    ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
197        if c.is_some() {
198            let (peer_id, tunnel_id) = c.unwrap();
199            if tunnel_id.is_some() {
200                let mut send_cache = self.send_cache.lock().unwrap();
201                if let Some(send_list) = send_cache.get_mut(&peer_id) {
202                    let mut send_index = None;
203                    for (index, send) in send_list.iter().enumerate() {
204                        if send.get_tunnel_id() == tunnel_id.unwrap() {
205                            send_index = Some(index);
206                            break;
207                        }
208                    }
209                    if let Some(send_index) = send_index {
210                        let send = send_list.remove(send_index);
211                        Ok(send)
212                    } else {
213                        Err(pool_err!(
214                            PoolErrorCode::Failed,
215                            "tunnel {:?} not found",
216                            tunnel_id.unwrap()
217                        ))
218                    }
219                } else {
220                    Err(pool_err!(
221                        PoolErrorCode::Failed,
222                        "tunnel {:?} not found",
223                        tunnel_id.unwrap()
224                    ))
225                }
226            } else {
227                {
228                    let mut send_cache = self.send_cache.lock().unwrap();
229                    if let Some(send_list) = send_cache.get_mut(&peer_id) {
230                        if !send_list.is_empty() {
231                            let send = send_list.pop().unwrap();
232                            if send_list.is_empty() {
233                                send_cache.remove(&peer_id);
234                            }
235                            return Ok(send);
236                        }
237                    }
238                }
239                let tunnel = self
240                    .tunnel_factory
241                    .create_tunnel(&peer_id)
242                    .await
243                    .map_err(into_pool_err!(PoolErrorCode::Failed))?;
244                let tunnel_id = self.tunnel_id_generator.generate();
245                let (recv, write) = tunnel.split();
246                let remote_id = recv.get_remote_peer_id();
247                let tunnel_meta = recv.get_tunnel_meta();
248                let write = ObjectHolder::new(write);
249                let cmd_handler = self.cmd_handler.clone();
250                let handle = create_recv_handle::<M, R, W, LEN, CMD>(
251                    recv,
252                    write.clone(),
253                    tunnel_id,
254                    cmd_handler,
255                );
256                Ok(CommonCmdSend::new(
257                    tunnel_id,
258                    handle,
259                    write,
260                    self.resp_waiter.clone(),
261                    remote_id,
262                    tunnel_meta,
263                ))
264            }
265        } else {
266            Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
267        }
268    }
269}
270
271pub struct CmdNodeWriteFactory<
272    M: CmdTunnelMeta,
273    R: CmdTunnelRead<M>,
274    W: CmdTunnelWrite<M>,
275    F: CmdNodeTunnelFactory<M, R, W>,
276    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
277    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
278    LISTENER: CmdTunnelListener<M, R, W>,
279> {
280    inner: Arc<CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>>,
281}
282
283impl<
284    M: CmdTunnelMeta,
285    R: CmdTunnelRead<M>,
286    W: CmdTunnelWrite<M>,
287    F: CmdNodeTunnelFactory<M, R, W>,
288    LEN: RawEncode
289        + for<'a> RawDecode<'a>
290        + Copy
291        + Send
292        + Sync
293        + 'static
294        + FromPrimitive
295        + ToPrimitive
296        + RawFixedBytes,
297    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
298    LISTENER: CmdTunnelListener<M, R, W>,
299> CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
300{
301    pub(crate) fn new(
302        tunnel_factory: F,
303        tunnel_listener: LISTENER,
304        cmd_handler: impl CmdHandler<LEN, CMD>,
305        resp_waiter: RespWaiterRef,
306    ) -> Self {
307        Self {
308            inner: Arc::new(CmdWriteFactoryImpl::new(
309                tunnel_factory,
310                tunnel_listener,
311                cmd_handler,
312                resp_waiter,
313            )),
314        }
315    }
316
317    pub fn start(&self) {
318        self.inner.start();
319    }
320}
321
322#[async_trait::async_trait]
323impl<
324    M: CmdTunnelMeta,
325    R: CmdTunnelRead<M>,
326    W: CmdTunnelWrite<M>,
327    F: CmdNodeTunnelFactory<M, R, W>,
328    LEN: RawEncode
329        + for<'a> RawDecode<'a>
330        + Copy
331        + Send
332        + Sync
333        + 'static
334        + FromPrimitive
335        + ToPrimitive
336        + RawFixedBytes,
337    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
338    LISTENER: CmdTunnelListener<M, R, W>,
339> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
340    for CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
341{
342    async fn create(
343        &self,
344        c: Option<(PeerId, Option<TunnelId>)>,
345    ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
346        self.inner.create(c).await
347    }
348}
349pub struct DefaultCmdNode<
350    M: CmdTunnelMeta,
351    R: CmdTunnelRead<M>,
352    W: CmdTunnelWrite<M>,
353    F: CmdNodeTunnelFactory<M, R, W>,
354    LEN: RawEncode
355        + for<'a> RawDecode<'a>
356        + Copy
357        + Send
358        + Sync
359        + 'static
360        + FromPrimitive
361        + ToPrimitive
362        + RawFixedBytes,
363    CMD: RawEncode
364        + for<'a> RawDecode<'a>
365        + Copy
366        + Send
367        + Sync
368        + 'static
369        + RawFixedBytes
370        + Eq
371        + Hash
372        + Debug,
373    LISTENER: CmdTunnelListener<M, R, W>,
374> {
375    tunnel_pool: ClassifiedWorkerPoolRef<
376        (PeerId, Option<TunnelId>),
377        CommonCmdSend<M, R, W, LEN, CMD>,
378        CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
379    >,
380    runtime_tunnels: Arc<TunnelRuntimeRegistry>,
381    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
382}
383
384impl<
385    M: CmdTunnelMeta,
386    R: CmdTunnelRead<M>,
387    W: CmdTunnelWrite<M>,
388    F: CmdNodeTunnelFactory<M, R, W>,
389    LEN: RawEncode
390        + for<'a> RawDecode<'a>
391        + Copy
392        + Send
393        + Sync
394        + 'static
395        + FromPrimitive
396        + ToPrimitive
397        + RawFixedBytes,
398    CMD: RawEncode
399        + for<'a> RawDecode<'a>
400        + Copy
401        + Send
402        + Sync
403        + 'static
404        + RawFixedBytes
405        + Eq
406        + Hash
407        + Debug,
408    LISTENER: CmdTunnelListener<M, R, W>,
409> DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
410{
411    fn tunnel_not_found(tunnel_id: TunnelId) -> sfo_result::Error<CmdErrorCode> {
412        crate::errors::cmd_err!(CmdErrorCode::Failed, "tunnel {:?} not found", tunnel_id)
413    }
414
415    pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
416        let cmd_handler_map = Arc::new(CmdHandlerMap::new());
417        let handler_map = cmd_handler_map.clone();
418        let resp_waiter = Arc::new(RespWaiter::new());
419        let waiter = resp_waiter.clone();
420        let write_factory = CmdNodeWriteFactory::<M, R, W, _, LEN, CMD, LISTENER>::new(
421            factory,
422            listener,
423            move |local_id: PeerId,
424                  peer_id: PeerId,
425                  tunnel_id: TunnelId,
426                  header: CmdHeader<LEN, CMD>,
427                  body_read: CmdBody| {
428                let handler_map = handler_map.clone();
429                let waiter = waiter.clone();
430                async move {
431                    if header.is_resp() && header.seq().is_some() {
432                        let resp_id =
433                            gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
434                        let _ = waiter.set_result(resp_id, body_read);
435                        Ok(None)
436                    } else {
437                        if let Some(handler) = handler_map.get(header.cmd_code()) {
438                            handler
439                                .handle(local_id, peer_id, tunnel_id, header, body_read)
440                                .await
441                        } else {
442                            Ok(None)
443                        }
444                    }
445                }
446            },
447            resp_waiter.clone(),
448        );
449        write_factory.start();
450        Arc::new(Self {
451            tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
452            runtime_tunnels: TunnelRuntimeRegistry::new(),
453            cmd_handler_map,
454        })
455    }
456
457    fn tracked_send_guard(
458        &self,
459        worker_guard: ClassifiedWorkerGuard<
460            (PeerId, Option<TunnelId>),
461            CommonCmdSend<M, R, W, LEN, CMD>,
462            CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
463        >,
464    ) -> CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER> {
465        let tunnel_id = worker_guard.get_tunnel_id();
466        TrackedSendGuard::new(worker_guard, self.runtime_tunnels.clone(), tunnel_id)
467    }
468
469    async fn reserve_tunnel(&self, tunnel_id: TunnelId) -> CmdResult<()> {
470        loop {
471            match self.runtime_tunnels.reserve_existing(tunnel_id) {
472                TunnelReserveResult::Acquired => return Ok(()),
473                TunnelReserveResult::Wait(waiter) => waiter.notified().await,
474                TunnelReserveResult::Missing => return Err(Self::tunnel_not_found(tunnel_id)),
475            }
476        }
477    }
478
479    async fn get_send(
480        &self,
481        peer_id: PeerId,
482    ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
483        loop {
484            let worker_guard = self
485                .tunnel_pool
486                .get_classified_worker((peer_id.clone(), None))
487                .await
488                .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))?;
489            let tunnel_id = worker_guard.get_tunnel_id();
490            if self.runtime_tunnels.mark_borrowed(tunnel_id) {
491                return Ok(self.tracked_send_guard(worker_guard));
492            }
493            drop(worker_guard);
494            yield_now().await;
495        }
496    }
497
498    async fn get_send_of_tunnel_id(
499        &self,
500        peer_id: PeerId,
501        tunnel_id: TunnelId,
502    ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
503        self.reserve_tunnel(tunnel_id).await?;
504        match self
505            .tunnel_pool
506            .get_classified_worker((peer_id, Some(tunnel_id)))
507            .await
508        {
509            Ok(worker_guard) => Ok(self.tracked_send_guard(worker_guard)),
510            Err(_) => {
511                self.runtime_tunnels.remove(tunnel_id);
512                Err(Self::tunnel_not_found(tunnel_id))
513            }
514        }
515    }
516}
517
518pub type CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER> = TrackedSendGuard<
519    (PeerId, Option<TunnelId>),
520    M,
521    CommonCmdSend<M, R, W, LEN, CMD>,
522    CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
523>;
524#[async_trait::async_trait]
525impl<
526    M: CmdTunnelMeta,
527    R: CmdTunnelRead<M>,
528    W: CmdTunnelWrite<M>,
529    F: CmdNodeTunnelFactory<M, R, W>,
530    LEN: RawEncode
531        + for<'a> RawDecode<'a>
532        + Copy
533        + RawFixedBytes
534        + Sync
535        + Send
536        + 'static
537        + FromPrimitive
538        + ToPrimitive,
539    CMD: RawEncode
540        + for<'a> RawDecode<'a>
541        + Copy
542        + RawFixedBytes
543        + Sync
544        + Send
545        + 'static
546        + Eq
547        + Hash
548        + Debug,
549    LISTENER: CmdTunnelListener<M, R, W>,
550>
551    CmdNode<
552        LEN,
553        CMD,
554        M,
555        CommonCmdSend<M, R, W, LEN, CMD>,
556        CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>,
557    > for DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
558{
559    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
560        self.cmd_handler_map.insert(cmd, handler);
561    }
562
563    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
564        let mut send = self.get_send(peer_id.clone()).await?;
565        send.send(cmd, version, body).await
566    }
567
568    async fn send_with_resp(
569        &self,
570        peer_id: &PeerId,
571        cmd: CMD,
572        version: u8,
573        body: &[u8],
574        timeout: Duration,
575    ) -> CmdResult<CmdBody> {
576        let mut send = self.get_send(peer_id.clone()).await?;
577        send.send_with_resp(cmd, version, body, timeout).await
578    }
579
580    async fn send_parts(
581        &self,
582        peer_id: &PeerId,
583        cmd: CMD,
584        version: u8,
585        body: &[&[u8]],
586    ) -> CmdResult<()> {
587        let mut send = self.get_send(peer_id.clone()).await?;
588        send.send_parts(cmd, version, body).await
589    }
590
591    async fn send_parts_with_resp(
592        &self,
593        peer_id: &PeerId,
594        cmd: CMD,
595        version: u8,
596        body: &[&[u8]],
597        timeout: Duration,
598    ) -> CmdResult<CmdBody> {
599        let mut send = self.get_send(peer_id.clone()).await?;
600        send.send_parts_with_resp(cmd, version, body, timeout).await
601    }
602
603    async fn send_cmd(
604        &self,
605        peer_id: &PeerId,
606        cmd: CMD,
607        version: u8,
608        body: CmdBody,
609    ) -> CmdResult<()> {
610        let mut send = self.get_send(peer_id.clone()).await?;
611        send.send_cmd(cmd, version, body).await
612    }
613
614    async fn send_cmd_with_resp(
615        &self,
616        peer_id: &PeerId,
617        cmd: CMD,
618        version: u8,
619        body: CmdBody,
620        timeout: Duration,
621    ) -> CmdResult<CmdBody> {
622        let mut send = self.get_send(peer_id.clone()).await?;
623        send.send_cmd_with_resp(cmd, version, body, timeout).await
624    }
625
626    async fn send_by_specify_tunnel(
627        &self,
628        peer_id: &PeerId,
629        tunnel_id: TunnelId,
630        cmd: CMD,
631        version: u8,
632        body: &[u8],
633    ) -> CmdResult<()> {
634        let mut send = self
635            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
636            .await?;
637        send.send(cmd, version, body).await
638    }
639
640    async fn send_by_specify_tunnel_with_resp(
641        &self,
642        peer_id: &PeerId,
643        tunnel_id: TunnelId,
644        cmd: CMD,
645        version: u8,
646        body: &[u8],
647        timeout: Duration,
648    ) -> CmdResult<CmdBody> {
649        let mut send = self
650            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
651            .await?;
652        send.send_with_resp(cmd, version, body, timeout).await
653    }
654
655    async fn send_parts_by_specify_tunnel(
656        &self,
657        peer_id: &PeerId,
658        tunnel_id: TunnelId,
659        cmd: CMD,
660        version: u8,
661        body: &[&[u8]],
662    ) -> CmdResult<()> {
663        let mut send = self
664            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
665            .await?;
666        send.send_parts(cmd, version, body).await
667    }
668
669    async fn send_parts_by_specify_tunnel_with_resp(
670        &self,
671        peer_id: &PeerId,
672        tunnel_id: TunnelId,
673        cmd: CMD,
674        version: u8,
675        body: &[&[u8]],
676        timeout: Duration,
677    ) -> CmdResult<CmdBody> {
678        let mut send = self
679            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
680            .await?;
681        send.send_parts_with_resp(cmd, version, body, timeout).await
682    }
683
684    async fn send_cmd_by_specify_tunnel(
685        &self,
686        peer_id: &PeerId,
687        tunnel_id: TunnelId,
688        cmd: CMD,
689        version: u8,
690        body: CmdBody,
691    ) -> CmdResult<()> {
692        let mut send = self
693            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
694            .await?;
695        send.send_cmd(cmd, version, body).await
696    }
697
698    async fn send_cmd_by_specify_tunnel_with_resp(
699        &self,
700        peer_id: &PeerId,
701        tunnel_id: TunnelId,
702        cmd: CMD,
703        version: u8,
704        body: CmdBody,
705        timeout: Duration,
706    ) -> CmdResult<CmdBody> {
707        let mut send = self
708            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
709            .await?;
710        send.send_cmd_with_resp(cmd, version, body, timeout).await
711    }
712
713    async fn clear_all_tunnel(&self) {
714        self.tunnel_pool.clear_all_worker().await;
715        self.runtime_tunnels.clear();
716    }
717
718    async fn get_send(
719        &self,
720        peer_id: &PeerId,
721        tunnel_id: TunnelId,
722    ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
723        self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await
724    }
725}