Skip to main content

sfo_cmd_server/node/
node.rs

1use crate::client::{ClassifiedSendGuard, CommonCmdSend, RespWaiter, RespWaiterRef, gen_resp_id};
2use crate::cmd::CmdHandlerMap;
3use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
4use crate::node::create_recv_handle;
5use crate::server::CmdTunnelListener;
6use crate::{
7    CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId,
8    TunnelId, TunnelIdGenerator, into_pool_err, pool_err,
9};
10use async_named_locker::ObjectHolder;
11use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
12use num::{FromPrimitive, ToPrimitive};
13use sfo_pool::{
14    ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
15    ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult,
16};
17use sfo_split::Splittable;
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::hash::Hash;
21use std::sync::{Arc, Mutex};
22use std::time::Duration;
23
24#[async_trait::async_trait]
25pub trait CmdNodeTunnelFactory<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
26    Send + Sync + 'static
27{
28    async fn create_tunnel(&self, remote_id: &PeerId) -> CmdResult<Splittable<R, W>>;
29}
30
31impl<M, R, W, LEN, CMD> ClassifiedWorker<(PeerId, Option<TunnelId>)>
32    for CommonCmdSend<M, R, W, LEN, CMD>
33where
34    M: CmdTunnelMeta,
35    R: CmdTunnelRead<M>,
36    W: CmdTunnelWrite<M>,
37    LEN: RawEncode
38        + for<'a> RawDecode<'a>
39        + Copy
40        + Send
41        + Sync
42        + 'static
43        + FromPrimitive
44        + ToPrimitive,
45    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
46{
47    fn is_work(&self) -> bool {
48        self.is_work && !self.recv_handle.is_finished()
49    }
50
51    fn is_valid(&self, c: (PeerId, Option<TunnelId>)) -> bool {
52        let (peer_id, tunnel_id) = c;
53        if tunnel_id.is_some() {
54            self.tunnel_id == tunnel_id.unwrap() && peer_id == self.remote_id
55        } else {
56            peer_id == self.remote_id
57        }
58    }
59
60    fn classification(&self) -> (PeerId, Option<TunnelId>) {
61        (self.remote_id.clone(), Some(self.tunnel_id))
62    }
63}
64
65struct CmdWriteFactoryImpl<
66    M: CmdTunnelMeta,
67    R: CmdTunnelRead<M>,
68    W: CmdTunnelWrite<M>,
69    F: CmdNodeTunnelFactory<M, R, W>,
70    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
71    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
72    LISTENER: CmdTunnelListener<M, R, W>,
73> {
74    tunnel_listener: LISTENER,
75    tunnel_factory: F,
76    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
77    tunnel_id_generator: TunnelIdGenerator,
78    resp_waiter: RespWaiterRef,
79    send_cache: Arc<Mutex<HashMap<PeerId, Vec<CommonCmdSend<M, R, W, LEN, CMD>>>>>,
80}
81
82impl<
83    M: CmdTunnelMeta,
84    R: CmdTunnelRead<M>,
85    W: CmdTunnelWrite<M>,
86    F: CmdNodeTunnelFactory<M, R, W>,
87    LEN: RawEncode
88        + for<'a> RawDecode<'a>
89        + Copy
90        + Send
91        + Sync
92        + 'static
93        + FromPrimitive
94        + ToPrimitive
95        + RawFixedBytes,
96    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
97    LISTENER: CmdTunnelListener<M, R, W>,
98> CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
99{
100    pub fn new(
101        tunnel_factory: F,
102        tunnel_listener: LISTENER,
103        cmd_handler: impl CmdHandler<LEN, CMD>,
104        resp_waiter: RespWaiterRef,
105    ) -> Self {
106        Self {
107            tunnel_listener,
108            tunnel_factory,
109            cmd_handler: Arc::new(cmd_handler),
110            tunnel_id_generator: TunnelIdGenerator::new(),
111            resp_waiter,
112            send_cache: Arc::new(Mutex::new(Default::default())),
113        }
114    }
115
116    pub fn start(self: &Arc<Self>) {
117        let this = self.clone();
118        tokio::spawn(async move {
119            if let Err(e) = this.run().await {
120                log::error!("cmd server error: {:?}", e);
121            }
122        });
123    }
124
125    async fn run(self: &Arc<Self>) -> CmdResult<()> {
126        loop {
127            let tunnel = self.tunnel_listener.accept().await?;
128            let peer_id = tunnel.get_remote_peer_id();
129            let tunnel_id = self.tunnel_id_generator.generate();
130            let resp_waiter = self.resp_waiter.clone();
131            let this = self.clone();
132            tokio::spawn(async move {
133                let ret: CmdResult<()> = async move {
134                    let this = this.clone();
135                    let cmd_handler = this.cmd_handler.clone();
136                    let (reader, writer) = tunnel.split();
137                    let remote_id = reader.get_remote_peer_id();
138                    let tunnel_meta = reader.get_tunnel_meta();
139                    let writer = ObjectHolder::new(writer);
140                    let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
141                        reader,
142                        writer.clone(),
143                        tunnel_id,
144                        cmd_handler,
145                    );
146                    {
147                        let mut send_cache = this.send_cache.lock().unwrap();
148                        let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
149                        send_list.push(CommonCmdSend::new(
150                            tunnel_id,
151                            recv_handle,
152                            writer,
153                            resp_waiter,
154                            remote_id,
155                            tunnel_meta,
156                        ));
157                    }
158                    Ok(())
159                }
160                .await;
161                if let Err(e) = ret {
162                    log::error!("peer connection error: {:?}", e);
163                }
164            });
165        }
166    }
167}
168
169#[async_trait::async_trait]
170impl<
171    M: CmdTunnelMeta,
172    R: CmdTunnelRead<M>,
173    W: CmdTunnelWrite<M>,
174    F: CmdNodeTunnelFactory<M, R, W>,
175    LEN: RawEncode
176        + for<'a> RawDecode<'a>
177        + Copy
178        + Send
179        + Sync
180        + 'static
181        + FromPrimitive
182        + ToPrimitive
183        + RawFixedBytes,
184    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
185    LISTENER: CmdTunnelListener<M, R, W>,
186> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
187    for CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
188{
189    async fn create(
190        &self,
191        c: Option<(PeerId, Option<TunnelId>)>,
192    ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
193        if c.is_some() {
194            let (peer_id, tunnel_id) = c.unwrap();
195            if tunnel_id.is_some() {
196                let mut send_cache = self.send_cache.lock().unwrap();
197                if let Some(send_list) = send_cache.get_mut(&peer_id) {
198                    let mut send_index = None;
199                    for (index, send) in send_list.iter().enumerate() {
200                        if send.get_tunnel_id() == tunnel_id.unwrap() {
201                            send_index = Some(index);
202                            break;
203                        }
204                    }
205                    if let Some(send_index) = send_index {
206                        let send = send_list.remove(send_index);
207                        Ok(send)
208                    } else {
209                        Err(pool_err!(
210                            PoolErrorCode::Failed,
211                            "tunnel {:?} not found",
212                            tunnel_id.unwrap()
213                        ))
214                    }
215                } else {
216                    Err(pool_err!(
217                        PoolErrorCode::Failed,
218                        "tunnel {:?} not found",
219                        tunnel_id.unwrap()
220                    ))
221                }
222            } else {
223                {
224                    let mut send_cache = self.send_cache.lock().unwrap();
225                    if let Some(send_list) = send_cache.get_mut(&peer_id) {
226                        if !send_list.is_empty() {
227                            let send = send_list.pop().unwrap();
228                            if send_list.is_empty() {
229                                send_cache.remove(&peer_id);
230                            }
231                            return Ok(send);
232                        }
233                    }
234                }
235                let tunnel = self
236                    .tunnel_factory
237                    .create_tunnel(&peer_id)
238                    .await
239                    .map_err(into_pool_err!(PoolErrorCode::Failed))?;
240                let tunnel_id = self.tunnel_id_generator.generate();
241                let (recv, write) = tunnel.split();
242                let remote_id = recv.get_remote_peer_id();
243                let tunnel_meta = recv.get_tunnel_meta();
244                let write = ObjectHolder::new(write);
245                let cmd_handler = self.cmd_handler.clone();
246                let handle = create_recv_handle::<M, R, W, LEN, CMD>(
247                    recv,
248                    write.clone(),
249                    tunnel_id,
250                    cmd_handler,
251                );
252                Ok(CommonCmdSend::new(
253                    tunnel_id,
254                    handle,
255                    write,
256                    self.resp_waiter.clone(),
257                    remote_id,
258                    tunnel_meta,
259                ))
260            }
261        } else {
262            Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
263        }
264    }
265}
266
267pub struct CmdNodeWriteFactory<
268    M: CmdTunnelMeta,
269    R: CmdTunnelRead<M>,
270    W: CmdTunnelWrite<M>,
271    F: CmdNodeTunnelFactory<M, R, W>,
272    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
273    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
274    LISTENER: CmdTunnelListener<M, R, W>,
275> {
276    inner: Arc<CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>>,
277}
278
279impl<
280    M: CmdTunnelMeta,
281    R: CmdTunnelRead<M>,
282    W: CmdTunnelWrite<M>,
283    F: CmdNodeTunnelFactory<M, R, W>,
284    LEN: RawEncode
285        + for<'a> RawDecode<'a>
286        + Copy
287        + Send
288        + Sync
289        + 'static
290        + FromPrimitive
291        + ToPrimitive
292        + RawFixedBytes,
293    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
294    LISTENER: CmdTunnelListener<M, R, W>,
295> CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
296{
297    pub(crate) fn new(
298        tunnel_factory: F,
299        tunnel_listener: LISTENER,
300        cmd_handler: impl CmdHandler<LEN, CMD>,
301        resp_waiter: RespWaiterRef,
302    ) -> Self {
303        Self {
304            inner: Arc::new(CmdWriteFactoryImpl::new(
305                tunnel_factory,
306                tunnel_listener,
307                cmd_handler,
308                resp_waiter,
309            )),
310        }
311    }
312
313    pub fn start(&self) {
314        self.inner.start();
315    }
316}
317
318#[async_trait::async_trait]
319impl<
320    M: CmdTunnelMeta,
321    R: CmdTunnelRead<M>,
322    W: CmdTunnelWrite<M>,
323    F: CmdNodeTunnelFactory<M, R, W>,
324    LEN: RawEncode
325        + for<'a> RawDecode<'a>
326        + Copy
327        + Send
328        + Sync
329        + 'static
330        + FromPrimitive
331        + ToPrimitive
332        + RawFixedBytes,
333    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
334    LISTENER: CmdTunnelListener<M, R, W>,
335> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
336    for CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
337{
338    async fn create(
339        &self,
340        c: Option<(PeerId, Option<TunnelId>)>,
341    ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
342        self.inner.create(c).await
343    }
344}
345pub struct DefaultCmdNode<
346    M: CmdTunnelMeta,
347    R: CmdTunnelRead<M>,
348    W: CmdTunnelWrite<M>,
349    F: CmdNodeTunnelFactory<M, R, W>,
350    LEN: RawEncode
351        + for<'a> RawDecode<'a>
352        + Copy
353        + Send
354        + Sync
355        + 'static
356        + FromPrimitive
357        + ToPrimitive
358        + RawFixedBytes,
359    CMD: RawEncode
360        + for<'a> RawDecode<'a>
361        + Copy
362        + Send
363        + Sync
364        + 'static
365        + RawFixedBytes
366        + Eq
367        + Hash
368        + Debug,
369    LISTENER: CmdTunnelListener<M, R, W>,
370> {
371    tunnel_pool: ClassifiedWorkerPoolRef<
372        (PeerId, Option<TunnelId>),
373        CommonCmdSend<M, R, W, LEN, CMD>,
374        CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
375    >,
376    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
377}
378
379impl<
380    M: CmdTunnelMeta,
381    R: CmdTunnelRead<M>,
382    W: CmdTunnelWrite<M>,
383    F: CmdNodeTunnelFactory<M, R, W>,
384    LEN: RawEncode
385        + for<'a> RawDecode<'a>
386        + Copy
387        + Send
388        + Sync
389        + 'static
390        + FromPrimitive
391        + ToPrimitive
392        + RawFixedBytes,
393    CMD: RawEncode
394        + for<'a> RawDecode<'a>
395        + Copy
396        + Send
397        + Sync
398        + 'static
399        + RawFixedBytes
400        + Eq
401        + Hash
402        + Debug,
403    LISTENER: CmdTunnelListener<M, R, W>,
404> DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
405{
406    pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
407        let cmd_handler_map = Arc::new(CmdHandlerMap::new());
408        let handler_map = cmd_handler_map.clone();
409        let resp_waiter = Arc::new(RespWaiter::new());
410        let waiter = resp_waiter.clone();
411        let write_factory = CmdNodeWriteFactory::<M, R, W, _, LEN, CMD, LISTENER>::new(
412            factory,
413            listener,
414            move |peer_id: PeerId,
415                  tunnel_id: TunnelId,
416                  header: CmdHeader<LEN, CMD>,
417                  body_read: CmdBody| {
418                let handler_map = handler_map.clone();
419                let waiter = waiter.clone();
420                async move {
421                    if header.is_resp() && header.seq().is_some() {
422                        let resp_id =
423                            gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
424                        let _ = waiter.set_result(resp_id, body_read);
425                        Ok(None)
426                    } else {
427                        if let Some(handler) = handler_map.get(header.cmd_code()) {
428                            handler.handle(peer_id, tunnel_id, header, body_read).await
429                        } else {
430                            Ok(None)
431                        }
432                    }
433                }
434            },
435            resp_waiter.clone(),
436        );
437        write_factory.start();
438        Arc::new(Self {
439            tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
440            cmd_handler_map,
441        })
442    }
443
444    async fn get_send(
445        &self,
446        peer_id: PeerId,
447    ) -> CmdResult<
448        ClassifiedWorkerGuard<
449            (PeerId, Option<TunnelId>),
450            CommonCmdSend<M, R, W, LEN, CMD>,
451            CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
452        >,
453    > {
454        self.tunnel_pool
455            .get_classified_worker((peer_id, None))
456            .await
457            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
458    }
459
460    async fn get_send_of_tunnel_id(
461        &self,
462        peer_id: PeerId,
463        tunnel_id: TunnelId,
464    ) -> CmdResult<
465        ClassifiedWorkerGuard<
466            (PeerId, Option<TunnelId>),
467            CommonCmdSend<M, R, W, LEN, CMD>,
468            CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
469        >,
470    > {
471        self.tunnel_pool
472            .get_classified_worker((peer_id, Some(tunnel_id)))
473            .await
474            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
475    }
476}
477
478pub type CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
479    (PeerId, Option<TunnelId>),
480    M,
481    CommonCmdSend<M, R, W, LEN, CMD>,
482    CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
483>;
484#[async_trait::async_trait]
485impl<
486    M: CmdTunnelMeta,
487    R: CmdTunnelRead<M>,
488    W: CmdTunnelWrite<M>,
489    F: CmdNodeTunnelFactory<M, R, W>,
490    LEN: RawEncode
491        + for<'a> RawDecode<'a>
492        + Copy
493        + RawFixedBytes
494        + Sync
495        + Send
496        + 'static
497        + FromPrimitive
498        + ToPrimitive,
499    CMD: RawEncode
500        + for<'a> RawDecode<'a>
501        + Copy
502        + RawFixedBytes
503        + Sync
504        + Send
505        + 'static
506        + Eq
507        + Hash
508        + Debug,
509    LISTENER: CmdTunnelListener<M, R, W>,
510>
511    CmdNode<
512        LEN,
513        CMD,
514        M,
515        CommonCmdSend<M, R, W, LEN, CMD>,
516        CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>,
517    > for DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
518{
519    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
520        self.cmd_handler_map.insert(cmd, handler);
521    }
522
523    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
524        let mut send = self.get_send(peer_id.clone()).await?;
525        send.send(cmd, version, body).await
526    }
527
528    async fn send_with_resp(
529        &self,
530        peer_id: &PeerId,
531        cmd: CMD,
532        version: u8,
533        body: &[u8],
534        timeout: Duration,
535    ) -> CmdResult<CmdBody> {
536        let mut send = self.get_send(peer_id.clone()).await?;
537        send.send_with_resp(cmd, version, body, timeout).await
538    }
539
540    async fn send2(
541        &self,
542        peer_id: &PeerId,
543        cmd: CMD,
544        version: u8,
545        body: &[&[u8]],
546    ) -> CmdResult<()> {
547        let mut send = self.get_send(peer_id.clone()).await?;
548        send.send2(cmd, version, body).await
549    }
550
551    async fn send2_with_resp(
552        &self,
553        peer_id: &PeerId,
554        cmd: CMD,
555        version: u8,
556        body: &[&[u8]],
557        timeout: Duration,
558    ) -> CmdResult<CmdBody> {
559        let mut send = self.get_send(peer_id.clone()).await?;
560        send.send2_with_resp(cmd, version, body, timeout).await
561    }
562
563    async fn send_cmd(
564        &self,
565        peer_id: &PeerId,
566        cmd: CMD,
567        version: u8,
568        body: CmdBody,
569    ) -> CmdResult<()> {
570        let mut send = self.get_send(peer_id.clone()).await?;
571        send.send_cmd(cmd, version, body).await
572    }
573
574    async fn send_cmd_with_resp(
575        &self,
576        peer_id: &PeerId,
577        cmd: CMD,
578        version: u8,
579        body: CmdBody,
580        timeout: Duration,
581    ) -> CmdResult<CmdBody> {
582        let mut send = self.get_send(peer_id.clone()).await?;
583        send.send_cmd_with_resp(cmd, version, body, timeout).await
584    }
585
586    async fn send_by_specify_tunnel(
587        &self,
588        peer_id: &PeerId,
589        tunnel_id: TunnelId,
590        cmd: CMD,
591        version: u8,
592        body: &[u8],
593    ) -> CmdResult<()> {
594        let mut send = self
595            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
596            .await?;
597        send.send(cmd, version, body).await
598    }
599
600    async fn send_by_specify_tunnel_with_resp(
601        &self,
602        peer_id: &PeerId,
603        tunnel_id: TunnelId,
604        cmd: CMD,
605        version: u8,
606        body: &[u8],
607        timeout: Duration,
608    ) -> CmdResult<CmdBody> {
609        let mut send = self
610            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
611            .await?;
612        send.send_with_resp(cmd, version, body, timeout).await
613    }
614
615    async fn send2_by_specify_tunnel(
616        &self,
617        peer_id: &PeerId,
618        tunnel_id: TunnelId,
619        cmd: CMD,
620        version: u8,
621        body: &[&[u8]],
622    ) -> CmdResult<()> {
623        let mut send = self
624            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
625            .await?;
626        send.send2(cmd, version, body).await
627    }
628
629    async fn send2_by_specify_tunnel_with_resp(
630        &self,
631        peer_id: &PeerId,
632        tunnel_id: TunnelId,
633        cmd: CMD,
634        version: u8,
635        body: &[&[u8]],
636        timeout: Duration,
637    ) -> CmdResult<CmdBody> {
638        let mut send = self
639            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
640            .await?;
641        send.send2_with_resp(cmd, version, body, timeout).await
642    }
643
644    async fn send_cmd_by_specify_tunnel(
645        &self,
646        peer_id: &PeerId,
647        tunnel_id: TunnelId,
648        cmd: CMD,
649        version: u8,
650        body: CmdBody,
651    ) -> CmdResult<()> {
652        let mut send = self
653            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
654            .await?;
655        send.send_cmd(cmd, version, body).await
656    }
657
658    async fn send_cmd_by_specify_tunnel_with_resp(
659        &self,
660        peer_id: &PeerId,
661        tunnel_id: TunnelId,
662        cmd: CMD,
663        version: u8,
664        body: CmdBody,
665        timeout: Duration,
666    ) -> CmdResult<CmdBody> {
667        let mut send = self
668            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
669            .await?;
670        send.send_cmd_with_resp(cmd, version, body, timeout).await
671    }
672
673    async fn clear_all_tunnel(&self) {
674        self.tunnel_pool.clear_all_worker().await
675    }
676
677    async fn get_send(
678        &self,
679        peer_id: &PeerId,
680        tunnel_id: TunnelId,
681    ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
682        Ok(ClassifiedSendGuard {
683            worker_guard: self
684                .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
685                .await?,
686            _p: Default::default(),
687        })
688    }
689}