Skip to main content

sfo_cmd_server/node/
node.rs

1use crate::client::{ClassifiedSendGuard, CommonCmdSend, RespWaiter, RespWaiterRef, gen_resp_id};
2use crate::cmd::CmdHandlerMap;
3use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
4use crate::node::create_recv_handle;
5use crate::server::CmdTunnelListener;
6use crate::{
7    CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId,
8    TunnelId, TunnelIdGenerator, into_pool_err, pool_err,
9};
10use async_named_locker::ObjectHolder;
11use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
12use num::{FromPrimitive, ToPrimitive};
13use sfo_pool::{
14    ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
15    ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult,
16};
17use sfo_split::Splittable;
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::hash::Hash;
21use std::sync::{Arc, Mutex};
22use std::time::Duration;
23
24#[async_trait::async_trait]
25pub trait CmdNodeTunnelFactory<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
26    Send + Sync + 'static
27{
28    async fn create_tunnel(&self, remote_id: &PeerId) -> CmdResult<Splittable<R, W>>;
29}
30
31impl<M, R, W, LEN, CMD> ClassifiedWorker<(PeerId, Option<TunnelId>)>
32    for CommonCmdSend<M, R, W, LEN, CMD>
33where
34    M: CmdTunnelMeta,
35    R: CmdTunnelRead<M>,
36    W: CmdTunnelWrite<M>,
37    LEN: RawEncode
38        + for<'a> RawDecode<'a>
39        + Copy
40        + Send
41        + Sync
42        + 'static
43        + FromPrimitive
44        + ToPrimitive,
45    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
46{
47    fn is_work(&self) -> bool {
48        self.is_work && !self.recv_handle.is_finished()
49    }
50
51    fn is_valid(&self, c: (PeerId, Option<TunnelId>)) -> bool {
52        let (peer_id, tunnel_id) = c;
53        if tunnel_id.is_some() {
54            self.tunnel_id == tunnel_id.unwrap() && peer_id == self.remote_id
55        } else {
56            peer_id == self.remote_id
57        }
58    }
59
60    fn classification(&self) -> (PeerId, Option<TunnelId>) {
61        (self.remote_id.clone(), Some(self.tunnel_id))
62    }
63}
64
65struct CmdWriteFactoryImpl<
66    M: CmdTunnelMeta,
67    R: CmdTunnelRead<M>,
68    W: CmdTunnelWrite<M>,
69    F: CmdNodeTunnelFactory<M, R, W>,
70    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
71    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
72    LISTENER: CmdTunnelListener<M, R, W>,
73> {
74    tunnel_listener: LISTENER,
75    tunnel_factory: F,
76    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
77    tunnel_id_generator: TunnelIdGenerator,
78    resp_waiter: RespWaiterRef,
79    send_cache: Arc<Mutex<HashMap<PeerId, Vec<CommonCmdSend<M, R, W, LEN, CMD>>>>>,
80}
81
82impl<
83    M: CmdTunnelMeta,
84    R: CmdTunnelRead<M>,
85    W: CmdTunnelWrite<M>,
86    F: CmdNodeTunnelFactory<M, R, W>,
87    LEN: RawEncode
88        + for<'a> RawDecode<'a>
89        + Copy
90        + Send
91        + Sync
92        + 'static
93        + FromPrimitive
94        + ToPrimitive
95        + RawFixedBytes,
96    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
97    LISTENER: CmdTunnelListener<M, R, W>,
98> CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
99{
100    pub fn new(
101        tunnel_factory: F,
102        tunnel_listener: LISTENER,
103        cmd_handler: impl CmdHandler<LEN, CMD>,
104        resp_waiter: RespWaiterRef,
105    ) -> Self {
106        Self {
107            tunnel_listener,
108            tunnel_factory,
109            cmd_handler: Arc::new(cmd_handler),
110            tunnel_id_generator: TunnelIdGenerator::new(),
111            resp_waiter,
112            send_cache: Arc::new(Mutex::new(Default::default())),
113        }
114    }
115
116    pub fn start(self: &Arc<Self>) {
117        let this = self.clone();
118        tokio::spawn(async move {
119            if let Err(e) = this.run().await {
120                log::error!("cmd server error: {:?}", e);
121            }
122        });
123    }
124
125    async fn run(self: &Arc<Self>) -> CmdResult<()> {
126        loop {
127            let tunnel = self.tunnel_listener.accept().await?;
128            let peer_id = tunnel.get_remote_peer_id();
129            let tunnel_id = self.tunnel_id_generator.generate();
130            let resp_waiter = self.resp_waiter.clone();
131            let this = self.clone();
132            tokio::spawn(async move {
133                let ret: CmdResult<()> = async move {
134                    let this = this.clone();
135                    let cmd_handler = this.cmd_handler.clone();
136                    let (reader, writer) = tunnel.split();
137                    let remote_id = reader.get_remote_peer_id();
138                    let tunnel_meta = reader.get_tunnel_meta();
139                    let writer = ObjectHolder::new(writer);
140                    let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
141                        reader,
142                        writer.clone(),
143                        tunnel_id,
144                        cmd_handler,
145                    );
146                    {
147                        let mut send_cache = this.send_cache.lock().unwrap();
148                        let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
149                        send_list.push(CommonCmdSend::new(
150                            tunnel_id,
151                            recv_handle,
152                            writer,
153                            resp_waiter,
154                            remote_id,
155                            tunnel_meta,
156                        ));
157                    }
158                    Ok(())
159                }
160                .await;
161                if let Err(e) = ret {
162                    log::error!("peer connection error: {:?}", e);
163                }
164            });
165        }
166    }
167}
168
169#[async_trait::async_trait]
170impl<
171    M: CmdTunnelMeta,
172    R: CmdTunnelRead<M>,
173    W: CmdTunnelWrite<M>,
174    F: CmdNodeTunnelFactory<M, R, W>,
175    LEN: RawEncode
176        + for<'a> RawDecode<'a>
177        + Copy
178        + Send
179        + Sync
180        + 'static
181        + FromPrimitive
182        + ToPrimitive
183        + RawFixedBytes,
184    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
185    LISTENER: CmdTunnelListener<M, R, W>,
186> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
187    for CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
188{
189    async fn create(
190        &self,
191        c: Option<(PeerId, Option<TunnelId>)>,
192    ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
193        if c.is_some() {
194            let (peer_id, tunnel_id) = c.unwrap();
195            if tunnel_id.is_some() {
196                let mut send_cache = self.send_cache.lock().unwrap();
197                if let Some(send_list) = send_cache.get_mut(&peer_id) {
198                    let mut send_index = None;
199                    for (index, send) in send_list.iter().enumerate() {
200                        if send.get_tunnel_id() == tunnel_id.unwrap() {
201                            send_index = Some(index);
202                            break;
203                        }
204                    }
205                    if let Some(send_index) = send_index {
206                        let send = send_list.remove(send_index);
207                        Ok(send)
208                    } else {
209                        Err(pool_err!(
210                            PoolErrorCode::Failed,
211                            "tunnel {:?} not found",
212                            tunnel_id.unwrap()
213                        ))
214                    }
215                } else {
216                    Err(pool_err!(
217                        PoolErrorCode::Failed,
218                        "tunnel {:?} not found",
219                        tunnel_id.unwrap()
220                    ))
221                }
222            } else {
223                {
224                    let mut send_cache = self.send_cache.lock().unwrap();
225                    if let Some(send_list) = send_cache.get_mut(&peer_id) {
226                        if !send_list.is_empty() {
227                            let send = send_list.pop().unwrap();
228                            if send_list.is_empty() {
229                                send_cache.remove(&peer_id);
230                            }
231                            return Ok(send);
232                        }
233                    }
234                }
235                let tunnel = self
236                    .tunnel_factory
237                    .create_tunnel(&peer_id)
238                    .await
239                    .map_err(into_pool_err!(PoolErrorCode::Failed))?;
240                let tunnel_id = self.tunnel_id_generator.generate();
241                let (recv, write) = tunnel.split();
242                let remote_id = recv.get_remote_peer_id();
243                let tunnel_meta = recv.get_tunnel_meta();
244                let write = ObjectHolder::new(write);
245                let cmd_handler = self.cmd_handler.clone();
246                let handle = create_recv_handle::<M, R, W, LEN, CMD>(
247                    recv,
248                    write.clone(),
249                    tunnel_id,
250                    cmd_handler,
251                );
252                Ok(CommonCmdSend::new(
253                    tunnel_id,
254                    handle,
255                    write,
256                    self.resp_waiter.clone(),
257                    remote_id,
258                    tunnel_meta,
259                ))
260            }
261        } else {
262            Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
263        }
264    }
265}
266
267pub struct CmdNodeWriteFactory<
268    M: CmdTunnelMeta,
269    R: CmdTunnelRead<M>,
270    W: CmdTunnelWrite<M>,
271    F: CmdNodeTunnelFactory<M, R, W>,
272    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
273    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
274    LISTENER: CmdTunnelListener<M, R, W>,
275> {
276    inner: Arc<CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>>,
277}
278
279impl<
280    M: CmdTunnelMeta,
281    R: CmdTunnelRead<M>,
282    W: CmdTunnelWrite<M>,
283    F: CmdNodeTunnelFactory<M, R, W>,
284    LEN: RawEncode
285        + for<'a> RawDecode<'a>
286        + Copy
287        + Send
288        + Sync
289        + 'static
290        + FromPrimitive
291        + ToPrimitive
292        + RawFixedBytes,
293    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
294    LISTENER: CmdTunnelListener<M, R, W>,
295> CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
296{
297    pub(crate) fn new(
298        tunnel_factory: F,
299        tunnel_listener: LISTENER,
300        cmd_handler: impl CmdHandler<LEN, CMD>,
301        resp_waiter: RespWaiterRef,
302    ) -> Self {
303        Self {
304            inner: Arc::new(CmdWriteFactoryImpl::new(
305                tunnel_factory,
306                tunnel_listener,
307                cmd_handler,
308                resp_waiter,
309            )),
310        }
311    }
312
313    pub fn start(&self) {
314        self.inner.start();
315    }
316}
317
318#[async_trait::async_trait]
319impl<
320    M: CmdTunnelMeta,
321    R: CmdTunnelRead<M>,
322    W: CmdTunnelWrite<M>,
323    F: CmdNodeTunnelFactory<M, R, W>,
324    LEN: RawEncode
325        + for<'a> RawDecode<'a>
326        + Copy
327        + Send
328        + Sync
329        + 'static
330        + FromPrimitive
331        + ToPrimitive
332        + RawFixedBytes,
333    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
334    LISTENER: CmdTunnelListener<M, R, W>,
335> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
336    for CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
337{
338    async fn create(
339        &self,
340        c: Option<(PeerId, Option<TunnelId>)>,
341    ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
342        self.inner.create(c).await
343    }
344}
345pub struct DefaultCmdNode<
346    M: CmdTunnelMeta,
347    R: CmdTunnelRead<M>,
348    W: CmdTunnelWrite<M>,
349    F: CmdNodeTunnelFactory<M, R, W>,
350    LEN: RawEncode
351        + for<'a> RawDecode<'a>
352        + Copy
353        + Send
354        + Sync
355        + 'static
356        + FromPrimitive
357        + ToPrimitive
358        + RawFixedBytes,
359    CMD: RawEncode
360        + for<'a> RawDecode<'a>
361        + Copy
362        + Send
363        + Sync
364        + 'static
365        + RawFixedBytes
366        + Eq
367        + Hash
368        + Debug,
369    LISTENER: CmdTunnelListener<M, R, W>,
370> {
371    tunnel_pool: ClassifiedWorkerPoolRef<
372        (PeerId, Option<TunnelId>),
373        CommonCmdSend<M, R, W, LEN, CMD>,
374        CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
375    >,
376    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
377}
378
379impl<
380    M: CmdTunnelMeta,
381    R: CmdTunnelRead<M>,
382    W: CmdTunnelWrite<M>,
383    F: CmdNodeTunnelFactory<M, R, W>,
384    LEN: RawEncode
385        + for<'a> RawDecode<'a>
386        + Copy
387        + Send
388        + Sync
389        + 'static
390        + FromPrimitive
391        + ToPrimitive
392        + RawFixedBytes,
393    CMD: RawEncode
394        + for<'a> RawDecode<'a>
395        + Copy
396        + Send
397        + Sync
398        + 'static
399        + RawFixedBytes
400        + Eq
401        + Hash
402        + Debug,
403    LISTENER: CmdTunnelListener<M, R, W>,
404> DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
405{
406    pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
407        let cmd_handler_map = Arc::new(CmdHandlerMap::new());
408        let handler_map = cmd_handler_map.clone();
409        let resp_waiter = Arc::new(RespWaiter::new());
410        let waiter = resp_waiter.clone();
411        let write_factory = CmdNodeWriteFactory::<M, R, W, _, LEN, CMD, LISTENER>::new(
412            factory,
413            listener,
414            move |local_id: PeerId,
415                  peer_id: PeerId,
416                  tunnel_id: TunnelId,
417                  header: CmdHeader<LEN, CMD>,
418                  body_read: CmdBody| {
419                let handler_map = handler_map.clone();
420                let waiter = waiter.clone();
421                async move {
422                    if header.is_resp() && header.seq().is_some() {
423                        let resp_id =
424                            gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
425                        let _ = waiter.set_result(resp_id, body_read);
426                        Ok(None)
427                    } else {
428                        if let Some(handler) = handler_map.get(header.cmd_code()) {
429                            handler
430                                .handle(local_id, peer_id, tunnel_id, header, body_read)
431                                .await
432                        } else {
433                            Ok(None)
434                        }
435                    }
436                }
437            },
438            resp_waiter.clone(),
439        );
440        write_factory.start();
441        Arc::new(Self {
442            tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
443            cmd_handler_map,
444        })
445    }
446
447    async fn get_send(
448        &self,
449        peer_id: PeerId,
450    ) -> CmdResult<
451        ClassifiedWorkerGuard<
452            (PeerId, Option<TunnelId>),
453            CommonCmdSend<M, R, W, LEN, CMD>,
454            CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
455        >,
456    > {
457        self.tunnel_pool
458            .get_classified_worker((peer_id, None))
459            .await
460            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
461    }
462
463    async fn get_send_of_tunnel_id(
464        &self,
465        peer_id: PeerId,
466        tunnel_id: TunnelId,
467    ) -> CmdResult<
468        ClassifiedWorkerGuard<
469            (PeerId, Option<TunnelId>),
470            CommonCmdSend<M, R, W, LEN, CMD>,
471            CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
472        >,
473    > {
474        self.tunnel_pool
475            .get_classified_worker((peer_id, Some(tunnel_id)))
476            .await
477            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
478    }
479}
480
481pub type CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
482    (PeerId, Option<TunnelId>),
483    M,
484    CommonCmdSend<M, R, W, LEN, CMD>,
485    CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
486>;
487#[async_trait::async_trait]
488impl<
489    M: CmdTunnelMeta,
490    R: CmdTunnelRead<M>,
491    W: CmdTunnelWrite<M>,
492    F: CmdNodeTunnelFactory<M, R, W>,
493    LEN: RawEncode
494        + for<'a> RawDecode<'a>
495        + Copy
496        + RawFixedBytes
497        + Sync
498        + Send
499        + 'static
500        + FromPrimitive
501        + ToPrimitive,
502    CMD: RawEncode
503        + for<'a> RawDecode<'a>
504        + Copy
505        + RawFixedBytes
506        + Sync
507        + Send
508        + 'static
509        + Eq
510        + Hash
511        + Debug,
512    LISTENER: CmdTunnelListener<M, R, W>,
513>
514    CmdNode<
515        LEN,
516        CMD,
517        M,
518        CommonCmdSend<M, R, W, LEN, CMD>,
519        CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>,
520    > for DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
521{
522    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
523        self.cmd_handler_map.insert(cmd, handler);
524    }
525
526    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
527        let mut send = self.get_send(peer_id.clone()).await?;
528        send.send(cmd, version, body).await
529    }
530
531    async fn send_with_resp(
532        &self,
533        peer_id: &PeerId,
534        cmd: CMD,
535        version: u8,
536        body: &[u8],
537        timeout: Duration,
538    ) -> CmdResult<CmdBody> {
539        let mut send = self.get_send(peer_id.clone()).await?;
540        send.send_with_resp(cmd, version, body, timeout).await
541    }
542
543    async fn send2(
544        &self,
545        peer_id: &PeerId,
546        cmd: CMD,
547        version: u8,
548        body: &[&[u8]],
549    ) -> CmdResult<()> {
550        let mut send = self.get_send(peer_id.clone()).await?;
551        send.send2(cmd, version, body).await
552    }
553
554    async fn send2_with_resp(
555        &self,
556        peer_id: &PeerId,
557        cmd: CMD,
558        version: u8,
559        body: &[&[u8]],
560        timeout: Duration,
561    ) -> CmdResult<CmdBody> {
562        let mut send = self.get_send(peer_id.clone()).await?;
563        send.send2_with_resp(cmd, version, body, timeout).await
564    }
565
566    async fn send_cmd(
567        &self,
568        peer_id: &PeerId,
569        cmd: CMD,
570        version: u8,
571        body: CmdBody,
572    ) -> CmdResult<()> {
573        let mut send = self.get_send(peer_id.clone()).await?;
574        send.send_cmd(cmd, version, body).await
575    }
576
577    async fn send_cmd_with_resp(
578        &self,
579        peer_id: &PeerId,
580        cmd: CMD,
581        version: u8,
582        body: CmdBody,
583        timeout: Duration,
584    ) -> CmdResult<CmdBody> {
585        let mut send = self.get_send(peer_id.clone()).await?;
586        send.send_cmd_with_resp(cmd, version, body, timeout).await
587    }
588
589    async fn send_by_specify_tunnel(
590        &self,
591        peer_id: &PeerId,
592        tunnel_id: TunnelId,
593        cmd: CMD,
594        version: u8,
595        body: &[u8],
596    ) -> CmdResult<()> {
597        let mut send = self
598            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
599            .await?;
600        send.send(cmd, version, body).await
601    }
602
603    async fn send_by_specify_tunnel_with_resp(
604        &self,
605        peer_id: &PeerId,
606        tunnel_id: TunnelId,
607        cmd: CMD,
608        version: u8,
609        body: &[u8],
610        timeout: Duration,
611    ) -> CmdResult<CmdBody> {
612        let mut send = self
613            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
614            .await?;
615        send.send_with_resp(cmd, version, body, timeout).await
616    }
617
618    async fn send2_by_specify_tunnel(
619        &self,
620        peer_id: &PeerId,
621        tunnel_id: TunnelId,
622        cmd: CMD,
623        version: u8,
624        body: &[&[u8]],
625    ) -> CmdResult<()> {
626        let mut send = self
627            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
628            .await?;
629        send.send2(cmd, version, body).await
630    }
631
632    async fn send2_by_specify_tunnel_with_resp(
633        &self,
634        peer_id: &PeerId,
635        tunnel_id: TunnelId,
636        cmd: CMD,
637        version: u8,
638        body: &[&[u8]],
639        timeout: Duration,
640    ) -> CmdResult<CmdBody> {
641        let mut send = self
642            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
643            .await?;
644        send.send2_with_resp(cmd, version, body, timeout).await
645    }
646
647    async fn send_cmd_by_specify_tunnel(
648        &self,
649        peer_id: &PeerId,
650        tunnel_id: TunnelId,
651        cmd: CMD,
652        version: u8,
653        body: CmdBody,
654    ) -> CmdResult<()> {
655        let mut send = self
656            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
657            .await?;
658        send.send_cmd(cmd, version, body).await
659    }
660
661    async fn send_cmd_by_specify_tunnel_with_resp(
662        &self,
663        peer_id: &PeerId,
664        tunnel_id: TunnelId,
665        cmd: CMD,
666        version: u8,
667        body: CmdBody,
668        timeout: Duration,
669    ) -> CmdResult<CmdBody> {
670        let mut send = self
671            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
672            .await?;
673        send.send_cmd_with_resp(cmd, version, body, timeout).await
674    }
675
676    async fn clear_all_tunnel(&self) {
677        self.tunnel_pool.clear_all_worker().await
678    }
679
680    async fn get_send(
681        &self,
682        peer_id: &PeerId,
683        tunnel_id: TunnelId,
684    ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
685        Ok(ClassifiedSendGuard {
686            worker_guard: self
687                .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
688                .await?,
689            _p: Default::default(),
690        })
691    }
692}