Skip to main content

sfo_cmd_server/node/
classified_node.rs

1use crate::client::{
2    ClassifiedCmdSend, ClassifiedCmdTunnelRead, ClassifiedCmdTunnelWrite, ClassifiedSendGuard,
3    RespWaiter, RespWaiterRef, gen_resp_id,
4};
5use crate::cmd::CmdHandlerMap;
6use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
7use crate::node::create_recv_handle;
8use crate::server::CmdTunnelListener;
9use crate::{
10    ClassifiedCmdNode, CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, PeerId, TunnelId,
11    TunnelIdGenerator, into_pool_err, pool_err,
12};
13use async_named_locker::ObjectHolder;
14use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
15use num::{FromPrimitive, ToPrimitive};
16use sfo_pool::{
17    ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
18    ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification,
19};
20use sfo_split::Splittable;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27#[derive(Debug, Clone, Eq, Hash)]
28pub struct CmdNodeTunnelClassification<C: WorkerClassification> {
29    pub peer_id: Option<PeerId>,
30    pub tunnel_id: Option<TunnelId>,
31    pub classification: Option<C>,
32}
33
34impl<C: WorkerClassification> PartialEq for CmdNodeTunnelClassification<C> {
35    fn eq(&self, other: &Self) -> bool {
36        self.peer_id == other.peer_id
37            && self.tunnel_id == other.tunnel_id
38            && self.classification == other.classification
39    }
40}
41
42impl<C, M, R, W, LEN, CMD> ClassifiedWorker<CmdNodeTunnelClassification<C>>
43    for ClassifiedCmdSend<C, M, R, W, LEN, CMD>
44where
45    C: WorkerClassification,
46    M: CmdTunnelMeta,
47    R: ClassifiedCmdTunnelRead<C, M>,
48    W: ClassifiedCmdTunnelWrite<C, M>,
49    LEN: RawEncode
50        + for<'a> RawDecode<'a>
51        + Copy
52        + Send
53        + Sync
54        + 'static
55        + FromPrimitive
56        + ToPrimitive,
57    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
58{
59    fn is_work(&self) -> bool {
60        self.is_work && !self.recv_handle.is_finished()
61    }
62
63    fn is_valid(&self, c: CmdNodeTunnelClassification<C>) -> bool {
64        if c.peer_id.is_some() && c.peer_id.as_ref().unwrap() != &self.remote_id {
65            return false;
66        }
67
68        if c.tunnel_id.is_some() {
69            self.tunnel_id == c.tunnel_id.unwrap()
70        } else {
71            if c.classification.is_some() {
72                self.classification == c.classification.unwrap()
73            } else {
74                true
75            }
76        }
77    }
78
79    fn classification(&self) -> CmdNodeTunnelClassification<C> {
80        CmdNodeTunnelClassification {
81            peer_id: Some(self.remote_id.clone()),
82            tunnel_id: Some(self.tunnel_id),
83            classification: Some(self.classification.clone()),
84        }
85    }
86}
87
88#[async_trait::async_trait]
89pub trait ClassifiedCmdNodeTunnelFactory<
90    C: WorkerClassification,
91    M: CmdTunnelMeta,
92    R: ClassifiedCmdTunnelRead<C, M>,
93    W: ClassifiedCmdTunnelWrite<C, M>,
94>: Send + Sync + 'static
95{
96    async fn create_tunnel(
97        &self,
98        classification: Option<CmdNodeTunnelClassification<C>>,
99    ) -> CmdResult<Splittable<R, W>>;
100}
101
102struct CmdWriteFactoryImpl<
103    C: WorkerClassification,
104    M: CmdTunnelMeta,
105    R: ClassifiedCmdTunnelRead<C, M>,
106    W: ClassifiedCmdTunnelWrite<C, M>,
107    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
108    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
109    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
110    LISTENER: CmdTunnelListener<M, R, W>,
111> {
112    tunnel_listener: LISTENER,
113    tunnel_factory: F,
114    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
115    tunnel_id_generator: TunnelIdGenerator,
116    resp_waiter: RespWaiterRef,
117    send_cache: Arc<Mutex<HashMap<PeerId, Vec<ClassifiedCmdSend<C, M, R, W, LEN, CMD>>>>>,
118}
119
120impl<
121    C: WorkerClassification,
122    M: CmdTunnelMeta,
123    R: ClassifiedCmdTunnelRead<C, M>,
124    W: ClassifiedCmdTunnelWrite<C, M>,
125    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
126    LEN: RawEncode
127        + for<'a> RawDecode<'a>
128        + Copy
129        + Send
130        + Sync
131        + 'static
132        + FromPrimitive
133        + ToPrimitive
134        + RawFixedBytes,
135    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
136    LISTENER: CmdTunnelListener<M, R, W>,
137> CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
138{
139    pub fn new(
140        tunnel_factory: F,
141        tunnel_listener: LISTENER,
142        cmd_handler: impl CmdHandler<LEN, CMD>,
143        resp_waiter: RespWaiterRef,
144    ) -> Self {
145        Self {
146            tunnel_listener,
147            tunnel_factory,
148            cmd_handler: Arc::new(cmd_handler),
149            tunnel_id_generator: TunnelIdGenerator::new(),
150            resp_waiter,
151            send_cache: Arc::new(Mutex::new(Default::default())),
152        }
153    }
154
155    pub fn start(self: &Arc<Self>) {
156        let this = self.clone();
157        tokio::spawn(async move {
158            if let Err(e) = this.run().await {
159                log::error!("cmd server error: {:?}", e);
160            }
161        });
162    }
163
164    async fn run(self: &Arc<Self>) -> CmdResult<()> {
165        loop {
166            let tunnel = self.tunnel_listener.accept().await?;
167            let peer_id = tunnel.get_remote_peer_id();
168            let classification = tunnel.get_classification();
169            let tunnel_id = self.tunnel_id_generator.generate();
170            let this = self.clone();
171            let resp_waiter = self.resp_waiter.clone();
172            tokio::spawn(async move {
173                let ret: CmdResult<()> = async move {
174                    let this = this.clone();
175                    let cmd_handler = this.cmd_handler.clone();
176                    let (reader, writer) = tunnel.split();
177                    let tunnel_meta = reader.get_tunnel_meta();
178                    let remote_id = writer.get_remote_peer_id();
179                    let writer = ObjectHolder::new(writer);
180                    let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
181                        reader,
182                        writer.clone(),
183                        tunnel_id,
184                        cmd_handler,
185                    );
186                    {
187                        let mut send_cache = this.send_cache.lock().unwrap();
188                        let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
189                        send_list.push(ClassifiedCmdSend::new(
190                            tunnel_id,
191                            classification,
192                            recv_handle,
193                            writer,
194                            resp_waiter,
195                            remote_id,
196                            tunnel_meta,
197                        ));
198                    }
199                    Ok(())
200                }
201                .await;
202                if let Err(e) = ret {
203                    log::error!("peer connection error: {:?}", e);
204                }
205            });
206        }
207    }
208}
209
210#[async_trait::async_trait]
211impl<
212    C: WorkerClassification,
213    M: CmdTunnelMeta,
214    R: ClassifiedCmdTunnelRead<C, M>,
215    W: ClassifiedCmdTunnelWrite<C, M>,
216    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
217    LEN: RawEncode
218        + for<'a> RawDecode<'a>
219        + Copy
220        + Send
221        + Sync
222        + 'static
223        + FromPrimitive
224        + ToPrimitive
225        + RawFixedBytes,
226    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
227    LISTENER: CmdTunnelListener<M, R, W>,
228> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
229    for CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
230{
231    async fn create(
232        &self,
233        c: Option<CmdNodeTunnelClassification<C>>,
234    ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
235        if c.is_some() {
236            let classification = c.unwrap();
237            if classification.peer_id.is_some() {
238                let peer_id = classification.peer_id.clone().unwrap();
239                let tunnel_id = classification.tunnel_id;
240                if tunnel_id.is_some() {
241                    let mut send_cache = self.send_cache.lock().unwrap();
242                    if let Some(send_list) = send_cache.get_mut(&peer_id) {
243                        let mut send_index = None;
244                        for (index, send) in send_list.iter().enumerate() {
245                            if send.get_tunnel_id() == tunnel_id.unwrap() {
246                                send_index = Some(index);
247                                break;
248                            }
249                        }
250                        if let Some(send_index) = send_index {
251                            let send = send_list.remove(send_index);
252                            Ok(send)
253                        } else {
254                            Err(pool_err!(
255                                PoolErrorCode::Failed,
256                                "tunnel {:?} not found",
257                                tunnel_id.unwrap()
258                            ))
259                        }
260                    } else {
261                        Err(pool_err!(
262                            PoolErrorCode::Failed,
263                            "tunnel {:?} not found",
264                            tunnel_id.unwrap()
265                        ))
266                    }
267                } else {
268                    {
269                        let mut send_cache = self.send_cache.lock().unwrap();
270                        if let Some(send_list) = send_cache.get_mut(&peer_id) {
271                            if !send_list.is_empty() {
272                                let send = send_list.pop().unwrap();
273                                if send_list.is_empty() {
274                                    send_cache.remove(&peer_id);
275                                }
276                                return Ok(send);
277                            }
278                        }
279                    }
280                    let tunnel = self
281                        .tunnel_factory
282                        .create_tunnel(Some(classification))
283                        .await
284                        .map_err(into_pool_err!(PoolErrorCode::Failed))?;
285                    let classification = tunnel.get_classification();
286                    let tunnel_id = self.tunnel_id_generator.generate();
287                    let (recv, write) = tunnel.split();
288                    let remote_id = write.get_remote_peer_id();
289                    let tunnel_meta = recv.get_tunnel_meta();
290                    let write = ObjectHolder::new(write);
291                    let cmd_handler = self.cmd_handler.clone();
292                    let handle = create_recv_handle::<M, R, W, LEN, CMD>(
293                        recv,
294                        write.clone(),
295                        tunnel_id,
296                        cmd_handler,
297                    );
298                    Ok(ClassifiedCmdSend::new(
299                        tunnel_id,
300                        classification,
301                        handle,
302                        write,
303                        self.resp_waiter.clone(),
304                        remote_id,
305                        tunnel_meta,
306                    ))
307                }
308            } else {
309                if classification.tunnel_id.is_some() {
310                    Err(pool_err!(
311                        PoolErrorCode::Failed,
312                        "must set peer id when set tunnel id"
313                    ))
314                } else {
315                    let tunnel = self
316                        .tunnel_factory
317                        .create_tunnel(Some(classification))
318                        .await
319                        .map_err(into_pool_err!(PoolErrorCode::Failed))?;
320                    let classification = tunnel.get_classification();
321                    let tunnel_id = self.tunnel_id_generator.generate();
322                    let (recv, write) = tunnel.split();
323                    let remote_id = write.get_remote_peer_id();
324                    let tunnel_meta = write.get_tunnel_meta();
325                    let write = ObjectHolder::new(write);
326                    let cmd_handler = self.cmd_handler.clone();
327                    let handle = create_recv_handle::<M, R, W, LEN, CMD>(
328                        recv,
329                        write.clone(),
330                        tunnel_id,
331                        cmd_handler,
332                    );
333                    Ok(ClassifiedCmdSend::new(
334                        tunnel_id,
335                        classification,
336                        handle,
337                        write,
338                        self.resp_waiter.clone(),
339                        remote_id,
340                        tunnel_meta,
341                    ))
342                }
343            }
344        } else {
345            Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
346        }
347    }
348}
349
350pub struct ClassifiedCmdNodeWriteFactory<
351    C: WorkerClassification,
352    M: CmdTunnelMeta,
353    R: ClassifiedCmdTunnelRead<C, M>,
354    W: ClassifiedCmdTunnelWrite<C, M>,
355    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
356    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
357    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
358    LISTENER: CmdTunnelListener<M, R, W>,
359> {
360    inner: Arc<CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>>,
361}
362
363impl<
364    C: WorkerClassification,
365    M: CmdTunnelMeta,
366    R: ClassifiedCmdTunnelRead<C, M>,
367    W: ClassifiedCmdTunnelWrite<C, M>,
368    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
369    LEN: RawEncode
370        + for<'a> RawDecode<'a>
371        + Copy
372        + Send
373        + Sync
374        + 'static
375        + FromPrimitive
376        + ToPrimitive
377        + RawFixedBytes
378        + RawFixedBytes,
379    CMD: RawEncode
380        + for<'a> RawDecode<'a>
381        + Copy
382        + Send
383        + Sync
384        + 'static
385        + Debug
386        + RawFixedBytes
387        + RawFixedBytes,
388    LISTENER: CmdTunnelListener<M, R, W>,
389> ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
390{
391    pub(crate) fn new(
392        tunnel_factory: F,
393        tunnel_listener: LISTENER,
394        cmd_handler: impl CmdHandler<LEN, CMD>,
395        resp_waiter: RespWaiterRef,
396    ) -> Self {
397        Self {
398            inner: Arc::new(CmdWriteFactoryImpl::new(
399                tunnel_factory,
400                tunnel_listener,
401                cmd_handler,
402                resp_waiter,
403            )),
404        }
405    }
406
407    pub fn start(&self) {
408        self.inner.start();
409    }
410}
411
412#[async_trait::async_trait]
413impl<
414    C: WorkerClassification,
415    M: CmdTunnelMeta,
416    R: ClassifiedCmdTunnelRead<C, M>,
417    W: ClassifiedCmdTunnelWrite<C, M>,
418    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
419    LEN: RawEncode
420        + for<'a> RawDecode<'a>
421        + Copy
422        + Send
423        + Sync
424        + 'static
425        + FromPrimitive
426        + ToPrimitive
427        + RawFixedBytes,
428    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
429    LISTENER: CmdTunnelListener<M, R, W>,
430> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
431    for ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
432{
433    async fn create(
434        &self,
435        c: Option<CmdNodeTunnelClassification<C>>,
436    ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
437        self.inner.create(c).await
438    }
439}
440
441pub struct DefaultClassifiedCmdNode<
442    C: WorkerClassification,
443    M: CmdTunnelMeta,
444    R: ClassifiedCmdTunnelRead<C, M>,
445    W: ClassifiedCmdTunnelWrite<C, M>,
446    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
447    LEN: RawEncode
448        + for<'a> RawDecode<'a>
449        + Copy
450        + Send
451        + Sync
452        + 'static
453        + FromPrimitive
454        + ToPrimitive
455        + RawFixedBytes,
456    CMD: RawEncode
457        + for<'a> RawDecode<'a>
458        + Copy
459        + Send
460        + Sync
461        + 'static
462        + RawFixedBytes
463        + Eq
464        + Hash
465        + Debug,
466    LISTENER: CmdTunnelListener<M, R, W>,
467> {
468    tunnel_pool: ClassifiedWorkerPoolRef<
469        CmdNodeTunnelClassification<C>,
470        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
471        ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
472    >,
473    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
474}
475
476impl<
477    C: WorkerClassification,
478    M: CmdTunnelMeta,
479    R: ClassifiedCmdTunnelRead<C, M>,
480    W: ClassifiedCmdTunnelWrite<C, M>,
481    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
482    LEN: RawEncode
483        + for<'a> RawDecode<'a>
484        + Copy
485        + Send
486        + Sync
487        + 'static
488        + FromPrimitive
489        + ToPrimitive
490        + RawFixedBytes,
491    CMD: RawEncode
492        + for<'a> RawDecode<'a>
493        + Copy
494        + Send
495        + Sync
496        + 'static
497        + RawFixedBytes
498        + Eq
499        + Hash
500        + Debug,
501    LISTENER: CmdTunnelListener<M, R, W>,
502> DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
503{
504    pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
505        let cmd_handler_map = Arc::new(CmdHandlerMap::new());
506        let handler_map = cmd_handler_map.clone();
507        let resp_waiter = Arc::new(RespWaiter::new());
508        let waiter = resp_waiter.clone();
509        let write_factory = ClassifiedCmdNodeWriteFactory::<C, M, R, W, _, LEN, CMD, LISTENER>::new(
510            factory,
511            listener,
512            move |local_id: PeerId,
513                  peer_id: PeerId,
514                  tunnel_id: TunnelId,
515                  header: CmdHeader<LEN, CMD>,
516                  body_read: CmdBody| {
517                let handler_map = handler_map.clone();
518                let waiter = waiter.clone();
519                async move {
520                    if header.is_resp() && header.seq().is_some() {
521                        let resp_id =
522                            gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
523                        let _ = waiter.set_result(resp_id, body_read);
524                        Ok(None)
525                    } else {
526                        if let Some(handler) = handler_map.get(header.cmd_code()) {
527                            handler
528                                .handle(local_id, peer_id, tunnel_id, header, body_read)
529                                .await
530                        } else {
531                            Ok(None)
532                        }
533                    }
534                }
535            },
536            resp_waiter.clone(),
537        );
538        write_factory.start();
539        Arc::new(Self {
540            tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
541            cmd_handler_map,
542        })
543    }
544
545    async fn get_send(
546        &self,
547        peer_id: PeerId,
548    ) -> CmdResult<
549        ClassifiedWorkerGuard<
550            CmdNodeTunnelClassification<C>,
551            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
552            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
553        >,
554    > {
555        self.tunnel_pool
556            .get_classified_worker(CmdNodeTunnelClassification {
557                peer_id: Some(peer_id),
558                tunnel_id: None,
559                classification: None,
560            })
561            .await
562            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
563    }
564
565    async fn get_send_of_tunnel_id(
566        &self,
567        peer_id: PeerId,
568        tunnel_id: TunnelId,
569    ) -> CmdResult<
570        ClassifiedWorkerGuard<
571            CmdNodeTunnelClassification<C>,
572            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
573            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
574        >,
575    > {
576        self.tunnel_pool
577            .get_classified_worker(CmdNodeTunnelClassification {
578                peer_id: Some(peer_id),
579                tunnel_id: Some(tunnel_id),
580                classification: None,
581            })
582            .await
583            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
584    }
585
586    async fn get_classified_send(
587        &self,
588        classification: C,
589    ) -> CmdResult<
590        ClassifiedWorkerGuard<
591            CmdNodeTunnelClassification<C>,
592            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
593            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
594        >,
595    > {
596        self.tunnel_pool
597            .get_classified_worker(CmdNodeTunnelClassification {
598                peer_id: None,
599                tunnel_id: None,
600                classification: Some(classification),
601            })
602            .await
603            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
604    }
605
606    async fn get_peer_classified_send(
607        &self,
608        peer_id: PeerId,
609        classification: C,
610    ) -> CmdResult<
611        ClassifiedWorkerGuard<
612            CmdNodeTunnelClassification<C>,
613            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
614            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
615        >,
616    > {
617        self.tunnel_pool
618            .get_classified_worker(CmdNodeTunnelClassification {
619                peer_id: Some(peer_id),
620                tunnel_id: None,
621                classification: Some(classification),
622            })
623            .await
624            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
625    }
626}
627
628pub type ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
629    CmdNodeTunnelClassification<C>,
630    M,
631    ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
632    ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
633>;
634#[async_trait::async_trait]
635impl<
636    C: WorkerClassification,
637    M: CmdTunnelMeta,
638    R: ClassifiedCmdTunnelRead<C, M>,
639    W: ClassifiedCmdTunnelWrite<C, M>,
640    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
641    LEN: RawEncode
642        + for<'a> RawDecode<'a>
643        + Copy
644        + Send
645        + Sync
646        + 'static
647        + FromPrimitive
648        + ToPrimitive
649        + RawFixedBytes,
650    CMD: RawEncode
651        + for<'a> RawDecode<'a>
652        + Copy
653        + Send
654        + Sync
655        + 'static
656        + RawFixedBytes
657        + Eq
658        + Hash
659        + Debug,
660    LISTENER: CmdTunnelListener<M, R, W>,
661>
662    CmdNode<
663        LEN,
664        CMD,
665        M,
666        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
667        ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
668    > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
669{
670    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
671        self.cmd_handler_map.insert(cmd, handler)
672    }
673
674    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
675        let mut send = self.get_send(peer_id.clone()).await?;
676        send.send(cmd, version, body).await
677    }
678
679    async fn send_with_resp(
680        &self,
681        peer_id: &PeerId,
682        cmd: CMD,
683        version: u8,
684        body: &[u8],
685        timeout: Duration,
686    ) -> CmdResult<CmdBody> {
687        let mut send = self.get_send(peer_id.clone()).await?;
688        send.send_with_resp(cmd, version, body, timeout).await
689    }
690
691    async fn send2(
692        &self,
693        peer_id: &PeerId,
694        cmd: CMD,
695        version: u8,
696        body: &[&[u8]],
697    ) -> CmdResult<()> {
698        let mut send = self.get_send(peer_id.clone()).await?;
699        send.send2(cmd, version, body).await
700    }
701
702    async fn send2_with_resp(
703        &self,
704        peer_id: &PeerId,
705        cmd: CMD,
706        version: u8,
707        body: &[&[u8]],
708        timeout: Duration,
709    ) -> CmdResult<CmdBody> {
710        let mut send = self.get_send(peer_id.clone()).await?;
711        send.send2_with_resp(cmd, version, body, timeout).await
712    }
713
714    async fn send_cmd(
715        &self,
716        peer_id: &PeerId,
717        cmd: CMD,
718        version: u8,
719        body: CmdBody,
720    ) -> CmdResult<()> {
721        let mut send = self.get_send(peer_id.clone()).await?;
722        send.send_cmd(cmd, version, body).await
723    }
724
725    async fn send_cmd_with_resp(
726        &self,
727        peer_id: &PeerId,
728        cmd: CMD,
729        version: u8,
730        body: CmdBody,
731        timeout: Duration,
732    ) -> CmdResult<CmdBody> {
733        let mut send = self.get_send(peer_id.clone()).await?;
734        send.send_cmd_with_resp(cmd, version, body, timeout).await
735    }
736
737    async fn send_by_specify_tunnel(
738        &self,
739        peer_id: &PeerId,
740        tunnel_id: TunnelId,
741        cmd: CMD,
742        version: u8,
743        body: &[u8],
744    ) -> CmdResult<()> {
745        let mut send = self
746            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
747            .await?;
748        send.send(cmd, version, body).await
749    }
750
751    async fn send_by_specify_tunnel_with_resp(
752        &self,
753        peer_id: &PeerId,
754        tunnel_id: TunnelId,
755        cmd: CMD,
756        version: u8,
757        body: &[u8],
758        timeout: Duration,
759    ) -> CmdResult<CmdBody> {
760        let mut send = self
761            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
762            .await?;
763        send.send_with_resp(cmd, version, body, timeout).await
764    }
765
766    async fn send2_by_specify_tunnel(
767        &self,
768        peer_id: &PeerId,
769        tunnel_id: TunnelId,
770        cmd: CMD,
771        version: u8,
772        body: &[&[u8]],
773    ) -> CmdResult<()> {
774        let mut send = self
775            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
776            .await?;
777        send.send2(cmd, version, body).await
778    }
779
780    async fn send2_by_specify_tunnel_with_resp(
781        &self,
782        peer_id: &PeerId,
783        tunnel_id: TunnelId,
784        cmd: CMD,
785        version: u8,
786        body: &[&[u8]],
787        timeout: Duration,
788    ) -> CmdResult<CmdBody> {
789        let mut send = self
790            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
791            .await?;
792        send.send2_with_resp(cmd, version, body, timeout).await
793    }
794
795    async fn send_cmd_by_specify_tunnel(
796        &self,
797        peer_id: &PeerId,
798        tunnel_id: TunnelId,
799        cmd: CMD,
800        version: u8,
801        body: CmdBody,
802    ) -> CmdResult<()> {
803        let mut send = self
804            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
805            .await?;
806        send.send_cmd(cmd, version, body).await
807    }
808
809    async fn send_cmd_by_specify_tunnel_with_resp(
810        &self,
811        peer_id: &PeerId,
812        tunnel_id: TunnelId,
813        cmd: CMD,
814        version: u8,
815        body: CmdBody,
816        timeout: Duration,
817    ) -> CmdResult<CmdBody> {
818        let mut send = self
819            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
820            .await?;
821        send.send_cmd_with_resp(cmd, version, body, timeout).await
822    }
823
824    async fn clear_all_tunnel(&self) {
825        self.tunnel_pool.clear_all_worker().await;
826    }
827
828    async fn get_send(
829        &self,
830        peer_id: &PeerId,
831        tunnel_id: TunnelId,
832    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
833        Ok(ClassifiedSendGuard {
834            worker_guard: self
835                .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
836                .await?,
837            _p: Default::default(),
838        })
839    }
840}
841
842#[async_trait::async_trait]
843impl<
844    C: WorkerClassification,
845    M: CmdTunnelMeta,
846    R: ClassifiedCmdTunnelRead<C, M>,
847    W: ClassifiedCmdTunnelWrite<C, M>,
848    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
849    LEN: RawEncode
850        + for<'a> RawDecode<'a>
851        + Copy
852        + Send
853        + Sync
854        + 'static
855        + FromPrimitive
856        + ToPrimitive
857        + RawFixedBytes,
858    CMD: RawEncode
859        + for<'a> RawDecode<'a>
860        + Copy
861        + Send
862        + Sync
863        + 'static
864        + RawFixedBytes
865        + Eq
866        + Hash
867        + Debug,
868    LISTENER: CmdTunnelListener<M, R, W>,
869>
870    ClassifiedCmdNode<
871        LEN,
872        CMD,
873        C,
874        M,
875        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
876        ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
877    > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
878{
879    async fn send_by_classified_tunnel(
880        &self,
881        classification: C,
882        cmd: CMD,
883        version: u8,
884        body: &[u8],
885    ) -> CmdResult<()> {
886        let mut send = self.get_classified_send(classification).await?;
887        send.send(cmd, version, body).await
888    }
889
890    async fn send_by_classified_tunnel_with_resp(
891        &self,
892        classification: C,
893        cmd: CMD,
894        version: u8,
895        body: &[u8],
896        timeout: Duration,
897    ) -> CmdResult<CmdBody> {
898        let mut send = self.get_classified_send(classification).await?;
899        send.send_with_resp(cmd, version, body, timeout).await
900    }
901
902    async fn send2_by_classified_tunnel(
903        &self,
904        classification: C,
905        cmd: CMD,
906        version: u8,
907        body: &[&[u8]],
908    ) -> CmdResult<()> {
909        let mut send = self.get_classified_send(classification).await?;
910        send.send2(cmd, version, body).await
911    }
912
913    async fn send2_by_classified_tunnel_with_resp(
914        &self,
915        classification: C,
916        cmd: CMD,
917        version: u8,
918        body: &[&[u8]],
919        timeout: Duration,
920    ) -> CmdResult<CmdBody> {
921        let mut send = self.get_classified_send(classification).await?;
922        send.send2_with_resp(cmd, version, body, timeout).await
923    }
924
925    async fn send_cmd_by_classified_tunnel(
926        &self,
927        classification: C,
928        cmd: CMD,
929        version: u8,
930        body: CmdBody,
931    ) -> CmdResult<()> {
932        let mut send = self.get_classified_send(classification).await?;
933        send.send_cmd(cmd, version, body).await
934    }
935
936    async fn send_cmd_by_classified_tunnel_with_resp(
937        &self,
938        classification: C,
939        cmd: CMD,
940        version: u8,
941        body: CmdBody,
942        timeout: Duration,
943    ) -> CmdResult<CmdBody> {
944        let mut send = self.get_classified_send(classification).await?;
945        send.send_cmd_with_resp(cmd, version, body, timeout).await
946    }
947
948    async fn send_by_peer_classified_tunnel(
949        &self,
950        peer_id: &PeerId,
951        classification: C,
952        cmd: CMD,
953        version: u8,
954        body: &[u8],
955    ) -> CmdResult<()> {
956        let mut send = self
957            .get_peer_classified_send(peer_id.clone(), classification)
958            .await?;
959        send.send(cmd, version, body).await
960    }
961
962    async fn send_by_peer_classified_tunnel_with_resp(
963        &self,
964        peer_id: &PeerId,
965        classification: C,
966        cmd: CMD,
967        version: u8,
968        body: &[u8],
969        timeout: Duration,
970    ) -> CmdResult<CmdBody> {
971        let mut send = self
972            .get_peer_classified_send(peer_id.clone(), classification)
973            .await?;
974        send.send_with_resp(cmd, version, body, timeout).await
975    }
976
977    async fn send2_by_peer_classified_tunnel(
978        &self,
979        peer_id: &PeerId,
980        classification: C,
981        cmd: CMD,
982        version: u8,
983        body: &[&[u8]],
984    ) -> CmdResult<()> {
985        let mut send = self
986            .get_peer_classified_send(peer_id.clone(), classification)
987            .await?;
988        send.send2(cmd, version, body).await
989    }
990
991    async fn send2_by_peer_classified_tunnel_with_resp(
992        &self,
993        peer_id: &PeerId,
994        classification: C,
995        cmd: CMD,
996        version: u8,
997        body: &[&[u8]],
998        timeout: Duration,
999    ) -> CmdResult<CmdBody> {
1000        let mut send = self
1001            .get_peer_classified_send(peer_id.clone(), classification)
1002            .await?;
1003        send.send2_with_resp(cmd, version, body, timeout).await
1004    }
1005
1006    async fn send_cmd_by_peer_classified_tunnel(
1007        &self,
1008        peer_id: &PeerId,
1009        classification: C,
1010        cmd: CMD,
1011        version: u8,
1012        body: CmdBody,
1013    ) -> CmdResult<()> {
1014        let mut send = self
1015            .get_peer_classified_send(peer_id.clone(), classification)
1016            .await?;
1017        send.send_cmd(cmd, version, body).await
1018    }
1019
1020    async fn send_cmd_by_peer_classified_tunnel_with_resp(
1021        &self,
1022        peer_id: &PeerId,
1023        classification: C,
1024        cmd: CMD,
1025        version: u8,
1026        body: CmdBody,
1027        timeout: Duration,
1028    ) -> CmdResult<CmdBody> {
1029        let mut send = self
1030            .get_peer_classified_send(peer_id.clone(), classification)
1031            .await?;
1032        send.send_cmd_with_resp(cmd, version, body, timeout).await
1033    }
1034
1035    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId> {
1036        let send = self.get_classified_send(classification).await?;
1037        Ok(send.get_tunnel_id())
1038    }
1039
1040    async fn find_tunnel_id_by_peer_classified(
1041        &self,
1042        peer_id: &PeerId,
1043        classification: C,
1044    ) -> CmdResult<TunnelId> {
1045        let send = self
1046            .get_peer_classified_send(peer_id.clone(), classification)
1047            .await?;
1048        Ok(send.get_tunnel_id())
1049    }
1050
1051    async fn get_send_by_classified(
1052        &self,
1053        classification: C,
1054    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1055        Ok(ClassifiedSendGuard {
1056            worker_guard: self.get_classified_send(classification).await?,
1057            _p: Default::default(),
1058        })
1059    }
1060
1061    async fn get_send_by_peer_classified(
1062        &self,
1063        peer_id: &PeerId,
1064        classification: C,
1065    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1066        Ok(ClassifiedSendGuard {
1067            worker_guard: self
1068                .get_peer_classified_send(peer_id.clone(), classification)
1069                .await?,
1070            _p: Default::default(),
1071        })
1072    }
1073}