Skip to main content

sfo_cmd_server/node/
classified_node.rs

1use crate::client::{
2    ClassifiedCmdSend, ClassifiedCmdTunnelRead, ClassifiedCmdTunnelWrite, ClassifiedSendGuard,
3    RespWaiter, RespWaiterRef, gen_resp_id,
4};
5use crate::cmd::CmdHandlerMap;
6use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
7use crate::node::create_recv_handle;
8use crate::server::CmdTunnelListener;
9use crate::{
10    ClassifiedCmdNode, CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, PeerId, TunnelId,
11    TunnelIdGenerator, into_pool_err, pool_err,
12};
13use async_named_locker::ObjectHolder;
14use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
15use num::{FromPrimitive, ToPrimitive};
16use sfo_pool::{
17    ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
18    ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification,
19};
20use sfo_split::Splittable;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27#[derive(Debug, Clone, Eq, Hash)]
28pub struct CmdNodeTunnelClassification<C: WorkerClassification> {
29    pub peer_id: Option<PeerId>,
30    pub tunnel_id: Option<TunnelId>,
31    pub classification: Option<C>,
32}
33
34impl<C: WorkerClassification> PartialEq for CmdNodeTunnelClassification<C> {
35    fn eq(&self, other: &Self) -> bool {
36        self.peer_id == other.peer_id
37            && self.tunnel_id == other.tunnel_id
38            && self.classification == other.classification
39    }
40}
41
42impl<C, M, R, W, LEN, CMD> ClassifiedWorker<CmdNodeTunnelClassification<C>>
43    for ClassifiedCmdSend<C, M, R, W, LEN, CMD>
44where
45    C: WorkerClassification,
46    M: CmdTunnelMeta,
47    R: ClassifiedCmdTunnelRead<C, M>,
48    W: ClassifiedCmdTunnelWrite<C, M>,
49    LEN: RawEncode
50        + for<'a> RawDecode<'a>
51        + Copy
52        + Send
53        + Sync
54        + 'static
55        + FromPrimitive
56        + ToPrimitive,
57    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
58{
59    fn is_work(&self) -> bool {
60        self.is_work && !self.recv_handle.is_finished()
61    }
62
63    fn is_valid(&self, c: CmdNodeTunnelClassification<C>) -> bool {
64        if c.peer_id.is_some() && c.peer_id.as_ref().unwrap() != &self.remote_id {
65            return false;
66        }
67
68        if c.tunnel_id.is_some() {
69            self.tunnel_id == c.tunnel_id.unwrap()
70        } else {
71            if c.classification.is_some() {
72                self.classification == c.classification.unwrap()
73            } else {
74                true
75            }
76        }
77    }
78
79    fn classification(&self) -> CmdNodeTunnelClassification<C> {
80        CmdNodeTunnelClassification {
81            peer_id: Some(self.remote_id.clone()),
82            tunnel_id: Some(self.tunnel_id),
83            classification: Some(self.classification.clone()),
84        }
85    }
86}
87
88#[async_trait::async_trait]
89pub trait ClassifiedCmdNodeTunnelFactory<
90    C: WorkerClassification,
91    M: CmdTunnelMeta,
92    R: ClassifiedCmdTunnelRead<C, M>,
93    W: ClassifiedCmdTunnelWrite<C, M>,
94>: Send + Sync + 'static
95{
96    async fn create_tunnel(
97        &self,
98        classification: Option<CmdNodeTunnelClassification<C>>,
99    ) -> CmdResult<Splittable<R, W>>;
100}
101
102struct CmdWriteFactoryImpl<
103    C: WorkerClassification,
104    M: CmdTunnelMeta,
105    R: ClassifiedCmdTunnelRead<C, M>,
106    W: ClassifiedCmdTunnelWrite<C, M>,
107    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
108    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
109    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
110    LISTENER: CmdTunnelListener<M, R, W>,
111> {
112    tunnel_listener: LISTENER,
113    tunnel_factory: F,
114    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
115    tunnel_id_generator: TunnelIdGenerator,
116    resp_waiter: RespWaiterRef,
117    send_cache: Arc<Mutex<HashMap<PeerId, Vec<ClassifiedCmdSend<C, M, R, W, LEN, CMD>>>>>,
118}
119
120impl<
121    C: WorkerClassification,
122    M: CmdTunnelMeta,
123    R: ClassifiedCmdTunnelRead<C, M>,
124    W: ClassifiedCmdTunnelWrite<C, M>,
125    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
126    LEN: RawEncode
127        + for<'a> RawDecode<'a>
128        + Copy
129        + Send
130        + Sync
131        + 'static
132        + FromPrimitive
133        + ToPrimitive
134        + RawFixedBytes,
135    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
136    LISTENER: CmdTunnelListener<M, R, W>,
137> CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
138{
139    pub fn new(
140        tunnel_factory: F,
141        tunnel_listener: LISTENER,
142        cmd_handler: impl CmdHandler<LEN, CMD>,
143        resp_waiter: RespWaiterRef,
144    ) -> Self {
145        Self {
146            tunnel_listener,
147            tunnel_factory,
148            cmd_handler: Arc::new(cmd_handler),
149            tunnel_id_generator: TunnelIdGenerator::new(),
150            resp_waiter,
151            send_cache: Arc::new(Mutex::new(Default::default())),
152        }
153    }
154
155    pub fn start(self: &Arc<Self>) {
156        let this = self.clone();
157        tokio::spawn(async move {
158            if let Err(e) = this.run().await {
159                log::error!("cmd server error: {:?}", e);
160            }
161        });
162    }
163
164    async fn run(self: &Arc<Self>) -> CmdResult<()> {
165        loop {
166            let tunnel = self.tunnel_listener.accept().await?;
167            let peer_id = tunnel.get_remote_peer_id();
168            let classification = tunnel.get_classification();
169            let tunnel_id = self.tunnel_id_generator.generate();
170            let this = self.clone();
171            let resp_waiter = self.resp_waiter.clone();
172            tokio::spawn(async move {
173                let ret: CmdResult<()> = async move {
174                    let this = this.clone();
175                    let cmd_handler = this.cmd_handler.clone();
176                    let (reader, writer) = tunnel.split();
177                    let tunnel_meta = reader.get_tunnel_meta();
178                    let remote_id = writer.get_remote_peer_id();
179                    let writer = ObjectHolder::new(writer);
180                    let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
181                        reader,
182                        writer.clone(),
183                        tunnel_id,
184                        cmd_handler,
185                    );
186                    {
187                        let mut send_cache = this.send_cache.lock().unwrap();
188                        let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
189                        send_list.push(ClassifiedCmdSend::new(
190                            tunnel_id,
191                            classification,
192                            recv_handle,
193                            writer,
194                            resp_waiter,
195                            remote_id,
196                            tunnel_meta,
197                        ));
198                    }
199                    Ok(())
200                }
201                .await;
202                if let Err(e) = ret {
203                    log::error!("peer connection error: {:?}", e);
204                }
205            });
206        }
207    }
208}
209
210#[async_trait::async_trait]
211impl<
212    C: WorkerClassification,
213    M: CmdTunnelMeta,
214    R: ClassifiedCmdTunnelRead<C, M>,
215    W: ClassifiedCmdTunnelWrite<C, M>,
216    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
217    LEN: RawEncode
218        + for<'a> RawDecode<'a>
219        + Copy
220        + Send
221        + Sync
222        + 'static
223        + FromPrimitive
224        + ToPrimitive
225        + RawFixedBytes,
226    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
227    LISTENER: CmdTunnelListener<M, R, W>,
228> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
229    for CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
230{
231    async fn create(
232        &self,
233        c: Option<CmdNodeTunnelClassification<C>>,
234    ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
235        if c.is_some() {
236            let classification = c.unwrap();
237            if classification.peer_id.is_some() {
238                let peer_id = classification.peer_id.clone().unwrap();
239                let tunnel_id = classification.tunnel_id;
240                if tunnel_id.is_some() {
241                    let mut send_cache = self.send_cache.lock().unwrap();
242                    if let Some(send_list) = send_cache.get_mut(&peer_id) {
243                        let mut send_index = None;
244                        for (index, send) in send_list.iter().enumerate() {
245                            if send.get_tunnel_id() == tunnel_id.unwrap() {
246                                send_index = Some(index);
247                                break;
248                            }
249                        }
250                        if let Some(send_index) = send_index {
251                            let send = send_list.remove(send_index);
252                            Ok(send)
253                        } else {
254                            Err(pool_err!(
255                                PoolErrorCode::Failed,
256                                "tunnel {:?} not found",
257                                tunnel_id.unwrap()
258                            ))
259                        }
260                    } else {
261                        Err(pool_err!(
262                            PoolErrorCode::Failed,
263                            "tunnel {:?} not found",
264                            tunnel_id.unwrap()
265                        ))
266                    }
267                } else {
268                    {
269                        let mut send_cache = self.send_cache.lock().unwrap();
270                        if let Some(send_list) = send_cache.get_mut(&peer_id) {
271                            if !send_list.is_empty() {
272                                let send = send_list.pop().unwrap();
273                                if send_list.is_empty() {
274                                    send_cache.remove(&peer_id);
275                                }
276                                return Ok(send);
277                            }
278                        }
279                    }
280                    let tunnel = self
281                        .tunnel_factory
282                        .create_tunnel(Some(classification))
283                        .await
284                        .map_err(into_pool_err!(PoolErrorCode::Failed))?;
285                    let classification = tunnel.get_classification();
286                    let tunnel_id = self.tunnel_id_generator.generate();
287                    let (recv, write) = tunnel.split();
288                    let remote_id = write.get_remote_peer_id();
289                    let tunnel_meta = recv.get_tunnel_meta();
290                    let write = ObjectHolder::new(write);
291                    let cmd_handler = self.cmd_handler.clone();
292                    let handle = create_recv_handle::<M, R, W, LEN, CMD>(
293                        recv,
294                        write.clone(),
295                        tunnel_id,
296                        cmd_handler,
297                    );
298                    Ok(ClassifiedCmdSend::new(
299                        tunnel_id,
300                        classification,
301                        handle,
302                        write,
303                        self.resp_waiter.clone(),
304                        remote_id,
305                        tunnel_meta,
306                    ))
307                }
308            } else {
309                if classification.tunnel_id.is_some() {
310                    Err(pool_err!(
311                        PoolErrorCode::Failed,
312                        "must set peer id when set tunnel id"
313                    ))
314                } else {
315                    let tunnel = self
316                        .tunnel_factory
317                        .create_tunnel(Some(classification))
318                        .await
319                        .map_err(into_pool_err!(PoolErrorCode::Failed))?;
320                    let classification = tunnel.get_classification();
321                    let tunnel_id = self.tunnel_id_generator.generate();
322                    let (recv, write) = tunnel.split();
323                    let remote_id = write.get_remote_peer_id();
324                    let tunnel_meta = write.get_tunnel_meta();
325                    let write = ObjectHolder::new(write);
326                    let cmd_handler = self.cmd_handler.clone();
327                    let handle = create_recv_handle::<M, R, W, LEN, CMD>(
328                        recv,
329                        write.clone(),
330                        tunnel_id,
331                        cmd_handler,
332                    );
333                    Ok(ClassifiedCmdSend::new(
334                        tunnel_id,
335                        classification,
336                        handle,
337                        write,
338                        self.resp_waiter.clone(),
339                        remote_id,
340                        tunnel_meta,
341                    ))
342                }
343            }
344        } else {
345            Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
346        }
347    }
348}
349
350pub struct ClassifiedCmdNodeWriteFactory<
351    C: WorkerClassification,
352    M: CmdTunnelMeta,
353    R: ClassifiedCmdTunnelRead<C, M>,
354    W: ClassifiedCmdTunnelWrite<C, M>,
355    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
356    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
357    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
358    LISTENER: CmdTunnelListener<M, R, W>,
359> {
360    inner: Arc<CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>>,
361}
362
363impl<
364    C: WorkerClassification,
365    M: CmdTunnelMeta,
366    R: ClassifiedCmdTunnelRead<C, M>,
367    W: ClassifiedCmdTunnelWrite<C, M>,
368    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
369    LEN: RawEncode
370        + for<'a> RawDecode<'a>
371        + Copy
372        + Send
373        + Sync
374        + 'static
375        + FromPrimitive
376        + ToPrimitive
377        + RawFixedBytes
378        + RawFixedBytes,
379    CMD: RawEncode
380        + for<'a> RawDecode<'a>
381        + Copy
382        + Send
383        + Sync
384        + 'static
385        + Debug
386        + RawFixedBytes
387        + RawFixedBytes,
388    LISTENER: CmdTunnelListener<M, R, W>,
389> ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
390{
391    pub(crate) fn new(
392        tunnel_factory: F,
393        tunnel_listener: LISTENER,
394        cmd_handler: impl CmdHandler<LEN, CMD>,
395        resp_waiter: RespWaiterRef,
396    ) -> Self {
397        Self {
398            inner: Arc::new(CmdWriteFactoryImpl::new(
399                tunnel_factory,
400                tunnel_listener,
401                cmd_handler,
402                resp_waiter,
403            )),
404        }
405    }
406
407    pub fn start(&self) {
408        self.inner.start();
409    }
410}
411
412#[async_trait::async_trait]
413impl<
414    C: WorkerClassification,
415    M: CmdTunnelMeta,
416    R: ClassifiedCmdTunnelRead<C, M>,
417    W: ClassifiedCmdTunnelWrite<C, M>,
418    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
419    LEN: RawEncode
420        + for<'a> RawDecode<'a>
421        + Copy
422        + Send
423        + Sync
424        + 'static
425        + FromPrimitive
426        + ToPrimitive
427        + RawFixedBytes,
428    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
429    LISTENER: CmdTunnelListener<M, R, W>,
430> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
431    for ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
432{
433    async fn create(
434        &self,
435        c: Option<CmdNodeTunnelClassification<C>>,
436    ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
437        self.inner.create(c).await
438    }
439}
440
441pub struct DefaultClassifiedCmdNode<
442    C: WorkerClassification,
443    M: CmdTunnelMeta,
444    R: ClassifiedCmdTunnelRead<C, M>,
445    W: ClassifiedCmdTunnelWrite<C, M>,
446    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
447    LEN: RawEncode
448        + for<'a> RawDecode<'a>
449        + Copy
450        + Send
451        + Sync
452        + 'static
453        + FromPrimitive
454        + ToPrimitive
455        + RawFixedBytes,
456    CMD: RawEncode
457        + for<'a> RawDecode<'a>
458        + Copy
459        + Send
460        + Sync
461        + 'static
462        + RawFixedBytes
463        + Eq
464        + Hash
465        + Debug,
466    LISTENER: CmdTunnelListener<M, R, W>,
467> {
468    tunnel_pool: ClassifiedWorkerPoolRef<
469        CmdNodeTunnelClassification<C>,
470        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
471        ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
472    >,
473    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
474}
475
476impl<
477    C: WorkerClassification,
478    M: CmdTunnelMeta,
479    R: ClassifiedCmdTunnelRead<C, M>,
480    W: ClassifiedCmdTunnelWrite<C, M>,
481    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
482    LEN: RawEncode
483        + for<'a> RawDecode<'a>
484        + Copy
485        + Send
486        + Sync
487        + 'static
488        + FromPrimitive
489        + ToPrimitive
490        + RawFixedBytes,
491    CMD: RawEncode
492        + for<'a> RawDecode<'a>
493        + Copy
494        + Send
495        + Sync
496        + 'static
497        + RawFixedBytes
498        + Eq
499        + Hash
500        + Debug,
501    LISTENER: CmdTunnelListener<M, R, W>,
502> DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
503{
504    pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
505        let cmd_handler_map = Arc::new(CmdHandlerMap::new());
506        let handler_map = cmd_handler_map.clone();
507        let resp_waiter = Arc::new(RespWaiter::new());
508        let waiter = resp_waiter.clone();
509        let write_factory = ClassifiedCmdNodeWriteFactory::<C, M, R, W, _, LEN, CMD, LISTENER>::new(
510            factory,
511            listener,
512            move |peer_id: PeerId,
513                  tunnel_id: TunnelId,
514                  header: CmdHeader<LEN, CMD>,
515                  body_read: CmdBody| {
516                let handler_map = handler_map.clone();
517                let waiter = waiter.clone();
518                async move {
519                    if header.is_resp() && header.seq().is_some() {
520                        let resp_id =
521                            gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
522                        let _ = waiter.set_result(resp_id, body_read);
523                        Ok(None)
524                    } else {
525                        if let Some(handler) = handler_map.get(header.cmd_code()) {
526                            handler.handle(peer_id, tunnel_id, header, body_read).await
527                        } else {
528                            Ok(None)
529                        }
530                    }
531                }
532            },
533            resp_waiter.clone(),
534        );
535        write_factory.start();
536        Arc::new(Self {
537            tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
538            cmd_handler_map,
539        })
540    }
541
542    async fn get_send(
543        &self,
544        peer_id: PeerId,
545    ) -> CmdResult<
546        ClassifiedWorkerGuard<
547            CmdNodeTunnelClassification<C>,
548            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
549            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
550        >,
551    > {
552        self.tunnel_pool
553            .get_classified_worker(CmdNodeTunnelClassification {
554                peer_id: Some(peer_id),
555                tunnel_id: None,
556                classification: None,
557            })
558            .await
559            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
560    }
561
562    async fn get_send_of_tunnel_id(
563        &self,
564        peer_id: PeerId,
565        tunnel_id: TunnelId,
566    ) -> CmdResult<
567        ClassifiedWorkerGuard<
568            CmdNodeTunnelClassification<C>,
569            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
570            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
571        >,
572    > {
573        self.tunnel_pool
574            .get_classified_worker(CmdNodeTunnelClassification {
575                peer_id: Some(peer_id),
576                tunnel_id: Some(tunnel_id),
577                classification: None,
578            })
579            .await
580            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
581    }
582
583    async fn get_classified_send(
584        &self,
585        classification: C,
586    ) -> CmdResult<
587        ClassifiedWorkerGuard<
588            CmdNodeTunnelClassification<C>,
589            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
590            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
591        >,
592    > {
593        self.tunnel_pool
594            .get_classified_worker(CmdNodeTunnelClassification {
595                peer_id: None,
596                tunnel_id: None,
597                classification: Some(classification),
598            })
599            .await
600            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
601    }
602
603    async fn get_peer_classified_send(
604        &self,
605        peer_id: PeerId,
606        classification: C,
607    ) -> CmdResult<
608        ClassifiedWorkerGuard<
609            CmdNodeTunnelClassification<C>,
610            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
611            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
612        >,
613    > {
614        self.tunnel_pool
615            .get_classified_worker(CmdNodeTunnelClassification {
616                peer_id: Some(peer_id),
617                tunnel_id: None,
618                classification: Some(classification),
619            })
620            .await
621            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
622    }
623}
624
625pub type ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
626    CmdNodeTunnelClassification<C>,
627    M,
628    ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
629    ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
630>;
631#[async_trait::async_trait]
632impl<
633    C: WorkerClassification,
634    M: CmdTunnelMeta,
635    R: ClassifiedCmdTunnelRead<C, M>,
636    W: ClassifiedCmdTunnelWrite<C, M>,
637    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
638    LEN: RawEncode
639        + for<'a> RawDecode<'a>
640        + Copy
641        + Send
642        + Sync
643        + 'static
644        + FromPrimitive
645        + ToPrimitive
646        + RawFixedBytes,
647    CMD: RawEncode
648        + for<'a> RawDecode<'a>
649        + Copy
650        + Send
651        + Sync
652        + 'static
653        + RawFixedBytes
654        + Eq
655        + Hash
656        + Debug,
657    LISTENER: CmdTunnelListener<M, R, W>,
658>
659    CmdNode<
660        LEN,
661        CMD,
662        M,
663        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
664        ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
665    > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
666{
667    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
668        self.cmd_handler_map.insert(cmd, handler)
669    }
670
671    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
672        let mut send = self.get_send(peer_id.clone()).await?;
673        send.send(cmd, version, body).await
674    }
675
676    async fn send_with_resp(
677        &self,
678        peer_id: &PeerId,
679        cmd: CMD,
680        version: u8,
681        body: &[u8],
682        timeout: Duration,
683    ) -> CmdResult<CmdBody> {
684        let mut send = self.get_send(peer_id.clone()).await?;
685        send.send_with_resp(cmd, version, body, timeout).await
686    }
687
688    async fn send2(
689        &self,
690        peer_id: &PeerId,
691        cmd: CMD,
692        version: u8,
693        body: &[&[u8]],
694    ) -> CmdResult<()> {
695        let mut send = self.get_send(peer_id.clone()).await?;
696        send.send2(cmd, version, body).await
697    }
698
699    async fn send2_with_resp(
700        &self,
701        peer_id: &PeerId,
702        cmd: CMD,
703        version: u8,
704        body: &[&[u8]],
705        timeout: Duration,
706    ) -> CmdResult<CmdBody> {
707        let mut send = self.get_send(peer_id.clone()).await?;
708        send.send2_with_resp(cmd, version, body, timeout).await
709    }
710
711    async fn send_cmd(
712        &self,
713        peer_id: &PeerId,
714        cmd: CMD,
715        version: u8,
716        body: CmdBody,
717    ) -> CmdResult<()> {
718        let mut send = self.get_send(peer_id.clone()).await?;
719        send.send_cmd(cmd, version, body).await
720    }
721
722    async fn send_cmd_with_resp(
723        &self,
724        peer_id: &PeerId,
725        cmd: CMD,
726        version: u8,
727        body: CmdBody,
728        timeout: Duration,
729    ) -> CmdResult<CmdBody> {
730        let mut send = self.get_send(peer_id.clone()).await?;
731        send.send_cmd_with_resp(cmd, version, body, timeout).await
732    }
733
734    async fn send_by_specify_tunnel(
735        &self,
736        peer_id: &PeerId,
737        tunnel_id: TunnelId,
738        cmd: CMD,
739        version: u8,
740        body: &[u8],
741    ) -> CmdResult<()> {
742        let mut send = self
743            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
744            .await?;
745        send.send(cmd, version, body).await
746    }
747
748    async fn send_by_specify_tunnel_with_resp(
749        &self,
750        peer_id: &PeerId,
751        tunnel_id: TunnelId,
752        cmd: CMD,
753        version: u8,
754        body: &[u8],
755        timeout: Duration,
756    ) -> CmdResult<CmdBody> {
757        let mut send = self
758            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
759            .await?;
760        send.send_with_resp(cmd, version, body, timeout).await
761    }
762
763    async fn send2_by_specify_tunnel(
764        &self,
765        peer_id: &PeerId,
766        tunnel_id: TunnelId,
767        cmd: CMD,
768        version: u8,
769        body: &[&[u8]],
770    ) -> CmdResult<()> {
771        let mut send = self
772            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
773            .await?;
774        send.send2(cmd, version, body).await
775    }
776
777    async fn send2_by_specify_tunnel_with_resp(
778        &self,
779        peer_id: &PeerId,
780        tunnel_id: TunnelId,
781        cmd: CMD,
782        version: u8,
783        body: &[&[u8]],
784        timeout: Duration,
785    ) -> CmdResult<CmdBody> {
786        let mut send = self
787            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
788            .await?;
789        send.send2_with_resp(cmd, version, body, timeout).await
790    }
791
792    async fn send_cmd_by_specify_tunnel(
793        &self,
794        peer_id: &PeerId,
795        tunnel_id: TunnelId,
796        cmd: CMD,
797        version: u8,
798        body: CmdBody,
799    ) -> CmdResult<()> {
800        let mut send = self
801            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
802            .await?;
803        send.send_cmd(cmd, version, body).await
804    }
805
806    async fn send_cmd_by_specify_tunnel_with_resp(
807        &self,
808        peer_id: &PeerId,
809        tunnel_id: TunnelId,
810        cmd: CMD,
811        version: u8,
812        body: CmdBody,
813        timeout: Duration,
814    ) -> CmdResult<CmdBody> {
815        let mut send = self
816            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
817            .await?;
818        send.send_cmd_with_resp(cmd, version, body, timeout).await
819    }
820
821    async fn clear_all_tunnel(&self) {
822        self.tunnel_pool.clear_all_worker().await;
823    }
824
825    async fn get_send(
826        &self,
827        peer_id: &PeerId,
828        tunnel_id: TunnelId,
829    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
830        Ok(ClassifiedSendGuard {
831            worker_guard: self
832                .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
833                .await?,
834            _p: Default::default(),
835        })
836    }
837}
838
839#[async_trait::async_trait]
840impl<
841    C: WorkerClassification,
842    M: CmdTunnelMeta,
843    R: ClassifiedCmdTunnelRead<C, M>,
844    W: ClassifiedCmdTunnelWrite<C, M>,
845    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
846    LEN: RawEncode
847        + for<'a> RawDecode<'a>
848        + Copy
849        + Send
850        + Sync
851        + 'static
852        + FromPrimitive
853        + ToPrimitive
854        + RawFixedBytes,
855    CMD: RawEncode
856        + for<'a> RawDecode<'a>
857        + Copy
858        + Send
859        + Sync
860        + 'static
861        + RawFixedBytes
862        + Eq
863        + Hash
864        + Debug,
865    LISTENER: CmdTunnelListener<M, R, W>,
866>
867    ClassifiedCmdNode<
868        LEN,
869        CMD,
870        C,
871        M,
872        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
873        ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
874    > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
875{
876    async fn send_by_classified_tunnel(
877        &self,
878        classification: C,
879        cmd: CMD,
880        version: u8,
881        body: &[u8],
882    ) -> CmdResult<()> {
883        let mut send = self.get_classified_send(classification).await?;
884        send.send(cmd, version, body).await
885    }
886
887    async fn send_by_classified_tunnel_with_resp(
888        &self,
889        classification: C,
890        cmd: CMD,
891        version: u8,
892        body: &[u8],
893        timeout: Duration,
894    ) -> CmdResult<CmdBody> {
895        let mut send = self.get_classified_send(classification).await?;
896        send.send_with_resp(cmd, version, body, timeout).await
897    }
898
899    async fn send2_by_classified_tunnel(
900        &self,
901        classification: C,
902        cmd: CMD,
903        version: u8,
904        body: &[&[u8]],
905    ) -> CmdResult<()> {
906        let mut send = self.get_classified_send(classification).await?;
907        send.send2(cmd, version, body).await
908    }
909
910    async fn send2_by_classified_tunnel_with_resp(
911        &self,
912        classification: C,
913        cmd: CMD,
914        version: u8,
915        body: &[&[u8]],
916        timeout: Duration,
917    ) -> CmdResult<CmdBody> {
918        let mut send = self.get_classified_send(classification).await?;
919        send.send2_with_resp(cmd, version, body, timeout).await
920    }
921
922    async fn send_cmd_by_classified_tunnel(
923        &self,
924        classification: C,
925        cmd: CMD,
926        version: u8,
927        body: CmdBody,
928    ) -> CmdResult<()> {
929        let mut send = self.get_classified_send(classification).await?;
930        send.send_cmd(cmd, version, body).await
931    }
932
933    async fn send_cmd_by_classified_tunnel_with_resp(
934        &self,
935        classification: C,
936        cmd: CMD,
937        version: u8,
938        body: CmdBody,
939        timeout: Duration,
940    ) -> CmdResult<CmdBody> {
941        let mut send = self.get_classified_send(classification).await?;
942        send.send_cmd_with_resp(cmd, version, body, timeout).await
943    }
944
945    async fn send_by_peer_classified_tunnel(
946        &self,
947        peer_id: &PeerId,
948        classification: C,
949        cmd: CMD,
950        version: u8,
951        body: &[u8],
952    ) -> CmdResult<()> {
953        let mut send = self
954            .get_peer_classified_send(peer_id.clone(), classification)
955            .await?;
956        send.send(cmd, version, body).await
957    }
958
959    async fn send_by_peer_classified_tunnel_with_resp(
960        &self,
961        peer_id: &PeerId,
962        classification: C,
963        cmd: CMD,
964        version: u8,
965        body: &[u8],
966        timeout: Duration,
967    ) -> CmdResult<CmdBody> {
968        let mut send = self
969            .get_peer_classified_send(peer_id.clone(), classification)
970            .await?;
971        send.send_with_resp(cmd, version, body, timeout).await
972    }
973
974    async fn send2_by_peer_classified_tunnel(
975        &self,
976        peer_id: &PeerId,
977        classification: C,
978        cmd: CMD,
979        version: u8,
980        body: &[&[u8]],
981    ) -> CmdResult<()> {
982        let mut send = self
983            .get_peer_classified_send(peer_id.clone(), classification)
984            .await?;
985        send.send2(cmd, version, body).await
986    }
987
988    async fn send2_by_peer_classified_tunnel_with_resp(
989        &self,
990        peer_id: &PeerId,
991        classification: C,
992        cmd: CMD,
993        version: u8,
994        body: &[&[u8]],
995        timeout: Duration,
996    ) -> CmdResult<CmdBody> {
997        let mut send = self
998            .get_peer_classified_send(peer_id.clone(), classification)
999            .await?;
1000        send.send2_with_resp(cmd, version, body, timeout).await
1001    }
1002
1003    async fn send_cmd_by_peer_classified_tunnel(
1004        &self,
1005        peer_id: &PeerId,
1006        classification: C,
1007        cmd: CMD,
1008        version: u8,
1009        body: CmdBody,
1010    ) -> CmdResult<()> {
1011        let mut send = self
1012            .get_peer_classified_send(peer_id.clone(), classification)
1013            .await?;
1014        send.send_cmd(cmd, version, body).await
1015    }
1016
1017    async fn send_cmd_by_peer_classified_tunnel_with_resp(
1018        &self,
1019        peer_id: &PeerId,
1020        classification: C,
1021        cmd: CMD,
1022        version: u8,
1023        body: CmdBody,
1024        timeout: Duration,
1025    ) -> CmdResult<CmdBody> {
1026        let mut send = self
1027            .get_peer_classified_send(peer_id.clone(), classification)
1028            .await?;
1029        send.send_cmd_with_resp(cmd, version, body, timeout).await
1030    }
1031
1032    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId> {
1033        let send = self.get_classified_send(classification).await?;
1034        Ok(send.get_tunnel_id())
1035    }
1036
1037    async fn find_tunnel_id_by_peer_classified(
1038        &self,
1039        peer_id: &PeerId,
1040        classification: C,
1041    ) -> CmdResult<TunnelId> {
1042        let send = self
1043            .get_peer_classified_send(peer_id.clone(), classification)
1044            .await?;
1045        Ok(send.get_tunnel_id())
1046    }
1047
1048    async fn get_send_by_classified(
1049        &self,
1050        classification: C,
1051    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1052        Ok(ClassifiedSendGuard {
1053            worker_guard: self.get_classified_send(classification).await?,
1054            _p: Default::default(),
1055        })
1056    }
1057
1058    async fn get_send_by_peer_classified(
1059        &self,
1060        peer_id: &PeerId,
1061        classification: C,
1062    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1063        Ok(ClassifiedSendGuard {
1064            worker_guard: self
1065                .get_peer_classified_send(peer_id.clone(), classification)
1066                .await?,
1067            _p: Default::default(),
1068        })
1069    }
1070}