Skip to main content

sfo_cmd_server/node/
classified_node.rs

1use crate::client::{
2    ClassifiedCmdSend, ClassifiedCmdTunnelRead, ClassifiedCmdTunnelWrite, ClassifiedSendGuard,
3    RespWaiter, RespWaiterRef, gen_resp_id,
4};
5use crate::cmd::CmdHandlerMap;
6use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
7use crate::node::create_recv_handle;
8use crate::server::CmdTunnelListener;
9use crate::{
10    ClassifiedCmdNode, CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, PeerId, TunnelId,
11    TunnelIdGenerator, into_pool_err, pool_err,
12};
13use async_named_locker::ObjectHolder;
14use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
15use num::{FromPrimitive, ToPrimitive};
16use sfo_pool::{
17    ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
18    ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification,
19};
20use sfo_split::Splittable;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27#[derive(Debug, Clone, Eq, Hash)]
28pub struct CmdNodeTunnelClassification<C: WorkerClassification> {
29    pub peer_id: Option<PeerId>,
30    pub tunnel_id: Option<TunnelId>,
31    pub classification: Option<C>,
32}
33
34impl<C: WorkerClassification> PartialEq for CmdNodeTunnelClassification<C> {
35    fn eq(&self, other: &Self) -> bool {
36        self.peer_id == other.peer_id
37            && self.tunnel_id == other.tunnel_id
38            && self.classification == other.classification
39    }
40}
41
42impl<C, M, R, W, LEN, CMD> ClassifiedWorker<CmdNodeTunnelClassification<C>>
43    for ClassifiedCmdSend<C, M, R, W, LEN, CMD>
44where
45    C: WorkerClassification,
46    M: CmdTunnelMeta,
47    R: ClassifiedCmdTunnelRead<C, M>,
48    W: ClassifiedCmdTunnelWrite<C, M>,
49    LEN: RawEncode
50        + for<'a> RawDecode<'a>
51        + Copy
52        + Send
53        + Sync
54        + 'static
55        + FromPrimitive
56        + ToPrimitive,
57    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
58{
59    fn is_work(&self) -> bool {
60        self.is_work && !self.recv_handle.is_finished()
61    }
62
63    fn is_valid(&self, c: CmdNodeTunnelClassification<C>) -> bool {
64        if c.peer_id.is_some() && c.peer_id.as_ref().unwrap() != &self.remote_id {
65            return false;
66        }
67
68        if c.tunnel_id.is_some() {
69            self.tunnel_id == c.tunnel_id.unwrap()
70        } else {
71            if c.classification.is_some() {
72                self.classification == c.classification.unwrap()
73            } else {
74                true
75            }
76        }
77    }
78
79    fn classification(&self) -> CmdNodeTunnelClassification<C> {
80        CmdNodeTunnelClassification {
81            peer_id: self.pool_peer_id.clone(),
82            tunnel_id: self.pool_tunnel_id,
83            classification: self.pool_classification.clone(),
84        }
85    }
86}
87
88#[async_trait::async_trait]
89pub trait ClassifiedCmdNodeTunnelFactory<
90    C: WorkerClassification,
91    M: CmdTunnelMeta,
92    R: ClassifiedCmdTunnelRead<C, M>,
93    W: ClassifiedCmdTunnelWrite<C, M>,
94>: Send + Sync + 'static
95{
96    async fn create_tunnel(
97        &self,
98        classification: Option<CmdNodeTunnelClassification<C>>,
99    ) -> CmdResult<Splittable<R, W>>;
100}
101
102struct CmdWriteFactoryImpl<
103    C: WorkerClassification,
104    M: CmdTunnelMeta,
105    R: ClassifiedCmdTunnelRead<C, M>,
106    W: ClassifiedCmdTunnelWrite<C, M>,
107    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
108    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
109    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
110    LISTENER: CmdTunnelListener<M, R, W>,
111> {
112    tunnel_listener: LISTENER,
113    tunnel_factory: F,
114    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
115    tunnel_id_generator: TunnelIdGenerator,
116    resp_waiter: RespWaiterRef,
117    send_cache: Arc<Mutex<HashMap<PeerId, Vec<ClassifiedCmdSend<C, M, R, W, LEN, CMD>>>>>,
118}
119
120impl<
121    C: WorkerClassification,
122    M: CmdTunnelMeta,
123    R: ClassifiedCmdTunnelRead<C, M>,
124    W: ClassifiedCmdTunnelWrite<C, M>,
125    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
126    LEN: RawEncode
127        + for<'a> RawDecode<'a>
128        + Copy
129        + Send
130        + Sync
131        + 'static
132        + FromPrimitive
133        + ToPrimitive
134        + RawFixedBytes,
135    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
136    LISTENER: CmdTunnelListener<M, R, W>,
137> CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
138{
139    pub fn new(
140        tunnel_factory: F,
141        tunnel_listener: LISTENER,
142        cmd_handler: impl CmdHandler<LEN, CMD>,
143        resp_waiter: RespWaiterRef,
144    ) -> Self {
145        Self {
146            tunnel_listener,
147            tunnel_factory,
148            cmd_handler: Arc::new(cmd_handler),
149            tunnel_id_generator: TunnelIdGenerator::new(),
150            resp_waiter,
151            send_cache: Arc::new(Mutex::new(Default::default())),
152        }
153    }
154
155    pub fn start(self: &Arc<Self>) {
156        let this = self.clone();
157        tokio::spawn(async move {
158            if let Err(e) = this.run().await {
159                log::error!("cmd server error: {:?}", e);
160            }
161        });
162    }
163
164    async fn run(self: &Arc<Self>) -> CmdResult<()> {
165        loop {
166            let tunnel = self.tunnel_listener.accept().await?;
167            let peer_id = tunnel.get_remote_peer_id();
168            let classification = tunnel.get_classification();
169            let tunnel_id = self.tunnel_id_generator.generate();
170            let this = self.clone();
171            let resp_waiter = self.resp_waiter.clone();
172            tokio::spawn(async move {
173                let ret: CmdResult<()> = async move {
174                    let this = this.clone();
175                    let cmd_handler = this.cmd_handler.clone();
176                    let (reader, writer) = tunnel.split();
177                    let tunnel_meta = reader.get_tunnel_meta();
178                    let remote_id = writer.get_remote_peer_id();
179                    let writer = ObjectHolder::new(writer);
180                    let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
181                        reader,
182                        writer.clone(),
183                        tunnel_id,
184                        cmd_handler,
185                    );
186                    {
187                        let mut send_cache = this.send_cache.lock().unwrap();
188                        let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
189                        send_list.push(ClassifiedCmdSend::new(
190                            tunnel_id,
191                            classification.clone(),
192                            Some(remote_id.clone()),
193                            Some(tunnel_id),
194                            Some(classification),
195                            recv_handle,
196                            writer,
197                            resp_waiter,
198                            remote_id,
199                            tunnel_meta,
200                        ));
201                    }
202                    Ok(())
203                }
204                .await;
205                if let Err(e) = ret {
206                    log::error!("peer connection error: {:?}", e);
207                }
208            });
209        }
210    }
211}
212
213#[async_trait::async_trait]
214impl<
215    C: WorkerClassification,
216    M: CmdTunnelMeta,
217    R: ClassifiedCmdTunnelRead<C, M>,
218    W: ClassifiedCmdTunnelWrite<C, M>,
219    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
220    LEN: RawEncode
221        + for<'a> RawDecode<'a>
222        + Copy
223        + Send
224        + Sync
225        + 'static
226        + FromPrimitive
227        + ToPrimitive
228        + RawFixedBytes,
229    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
230    LISTENER: CmdTunnelListener<M, R, W>,
231> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
232    for CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
233{
234    async fn create(
235        &self,
236        c: Option<CmdNodeTunnelClassification<C>>,
237    ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
238        if c.is_some() {
239            let classification = c.unwrap();
240            if classification.peer_id.is_some() {
241                let peer_id = classification.peer_id.clone().unwrap();
242                let tunnel_id = classification.tunnel_id;
243                if tunnel_id.is_some() {
244                    let mut send_cache = self.send_cache.lock().unwrap();
245                    if let Some(send_list) = send_cache.get_mut(&peer_id) {
246                        let mut send_index = None;
247                        for (index, send) in send_list.iter().enumerate() {
248                            if send.get_tunnel_id() == tunnel_id.unwrap() {
249                                send_index = Some(index);
250                                break;
251                            }
252                        }
253                        if let Some(send_index) = send_index {
254                            let mut send = send_list.remove(send_index);
255                            send.set_pool_key(
256                                Some(peer_id.clone()),
257                                tunnel_id,
258                                classification.classification.clone(),
259                            );
260                            Ok(send)
261                        } else {
262                            Err(pool_err!(
263                                PoolErrorCode::Failed,
264                                "tunnel {:?} not found",
265                                tunnel_id.unwrap()
266                            ))
267                        }
268                    } else {
269                        Err(pool_err!(
270                            PoolErrorCode::Failed,
271                            "tunnel {:?} not found",
272                            tunnel_id.unwrap()
273                        ))
274                    }
275                } else {
276                    {
277                        let mut send_cache = self.send_cache.lock().unwrap();
278                        if let Some(send_list) = send_cache.get_mut(&peer_id) {
279                            if !send_list.is_empty() {
280                                let mut send = send_list.pop().unwrap();
281                                send.set_pool_key(
282                                    Some(peer_id.clone()),
283                                    tunnel_id,
284                                    classification.classification.clone(),
285                                );
286                                if send_list.is_empty() {
287                                    send_cache.remove(&peer_id);
288                                }
289                                return Ok(send);
290                            }
291                        }
292                    }
293                    let tunnel = self
294                        .tunnel_factory
295                        .create_tunnel(Some(classification.clone()))
296                        .await
297                        .map_err(into_pool_err!(PoolErrorCode::Failed))?;
298                    let actual_classification = tunnel.get_classification();
299                    let pool_peer_id = classification.peer_id.clone();
300                    let pool_tunnel_id = classification.tunnel_id;
301                    let pool_classification = classification.classification.clone();
302                    let tunnel_id = self.tunnel_id_generator.generate();
303                    let (recv, write) = tunnel.split();
304                    let remote_id = write.get_remote_peer_id();
305                    let tunnel_meta = recv.get_tunnel_meta();
306                    let write = ObjectHolder::new(write);
307                    let cmd_handler = self.cmd_handler.clone();
308                    let handle = create_recv_handle::<M, R, W, LEN, CMD>(
309                        recv,
310                        write.clone(),
311                        tunnel_id,
312                        cmd_handler,
313                    );
314                    Ok(ClassifiedCmdSend::new(
315                        tunnel_id,
316                        actual_classification,
317                        pool_peer_id,
318                        pool_tunnel_id,
319                        pool_classification,
320                        handle,
321                        write,
322                        self.resp_waiter.clone(),
323                        remote_id,
324                        tunnel_meta,
325                    ))
326                }
327            } else {
328                if classification.tunnel_id.is_some() {
329                    Err(pool_err!(
330                        PoolErrorCode::Failed,
331                        "must set peer id when set tunnel id"
332                    ))
333                } else {
334                    let tunnel = self
335                        .tunnel_factory
336                        .create_tunnel(Some(classification.clone()))
337                        .await
338                        .map_err(into_pool_err!(PoolErrorCode::Failed))?;
339                    let actual_classification = tunnel.get_classification();
340                    let pool_peer_id = classification.peer_id.clone();
341                    let pool_tunnel_id = classification.tunnel_id;
342                    let pool_classification = classification.classification.clone();
343                    let tunnel_id = self.tunnel_id_generator.generate();
344                    let (recv, write) = tunnel.split();
345                    let remote_id = write.get_remote_peer_id();
346                    let tunnel_meta = write.get_tunnel_meta();
347                    let write = ObjectHolder::new(write);
348                    let cmd_handler = self.cmd_handler.clone();
349                    let handle = create_recv_handle::<M, R, W, LEN, CMD>(
350                        recv,
351                        write.clone(),
352                        tunnel_id,
353                        cmd_handler,
354                    );
355                    Ok(ClassifiedCmdSend::new(
356                        tunnel_id,
357                        actual_classification,
358                        pool_peer_id,
359                        pool_tunnel_id,
360                        pool_classification,
361                        handle,
362                        write,
363                        self.resp_waiter.clone(),
364                        remote_id,
365                        tunnel_meta,
366                    ))
367                }
368            }
369        } else {
370            Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
371        }
372    }
373}
374
375pub struct ClassifiedCmdNodeWriteFactory<
376    C: WorkerClassification,
377    M: CmdTunnelMeta,
378    R: ClassifiedCmdTunnelRead<C, M>,
379    W: ClassifiedCmdTunnelWrite<C, M>,
380    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
381    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
382    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
383    LISTENER: CmdTunnelListener<M, R, W>,
384> {
385    inner: Arc<CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>>,
386}
387
388impl<
389    C: WorkerClassification,
390    M: CmdTunnelMeta,
391    R: ClassifiedCmdTunnelRead<C, M>,
392    W: ClassifiedCmdTunnelWrite<C, M>,
393    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
394    LEN: RawEncode
395        + for<'a> RawDecode<'a>
396        + Copy
397        + Send
398        + Sync
399        + 'static
400        + FromPrimitive
401        + ToPrimitive
402        + RawFixedBytes
403        + RawFixedBytes,
404    CMD: RawEncode
405        + for<'a> RawDecode<'a>
406        + Copy
407        + Send
408        + Sync
409        + 'static
410        + Debug
411        + RawFixedBytes
412        + RawFixedBytes,
413    LISTENER: CmdTunnelListener<M, R, W>,
414> ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
415{
416    pub(crate) fn new(
417        tunnel_factory: F,
418        tunnel_listener: LISTENER,
419        cmd_handler: impl CmdHandler<LEN, CMD>,
420        resp_waiter: RespWaiterRef,
421    ) -> Self {
422        Self {
423            inner: Arc::new(CmdWriteFactoryImpl::new(
424                tunnel_factory,
425                tunnel_listener,
426                cmd_handler,
427                resp_waiter,
428            )),
429        }
430    }
431
432    pub fn start(&self) {
433        self.inner.start();
434    }
435}
436
437#[async_trait::async_trait]
438impl<
439    C: WorkerClassification,
440    M: CmdTunnelMeta,
441    R: ClassifiedCmdTunnelRead<C, M>,
442    W: ClassifiedCmdTunnelWrite<C, M>,
443    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
444    LEN: RawEncode
445        + for<'a> RawDecode<'a>
446        + Copy
447        + Send
448        + Sync
449        + 'static
450        + FromPrimitive
451        + ToPrimitive
452        + RawFixedBytes,
453    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
454    LISTENER: CmdTunnelListener<M, R, W>,
455> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
456    for ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
457{
458    async fn create(
459        &self,
460        c: Option<CmdNodeTunnelClassification<C>>,
461    ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
462        self.inner.create(c).await
463    }
464}
465
466pub struct DefaultClassifiedCmdNode<
467    C: WorkerClassification,
468    M: CmdTunnelMeta,
469    R: ClassifiedCmdTunnelRead<C, M>,
470    W: ClassifiedCmdTunnelWrite<C, M>,
471    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
472    LEN: RawEncode
473        + for<'a> RawDecode<'a>
474        + Copy
475        + Send
476        + Sync
477        + 'static
478        + FromPrimitive
479        + ToPrimitive
480        + RawFixedBytes,
481    CMD: RawEncode
482        + for<'a> RawDecode<'a>
483        + Copy
484        + Send
485        + Sync
486        + 'static
487        + RawFixedBytes
488        + Eq
489        + Hash
490        + Debug,
491    LISTENER: CmdTunnelListener<M, R, W>,
492> {
493    tunnel_pool: ClassifiedWorkerPoolRef<
494        CmdNodeTunnelClassification<C>,
495        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
496        ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
497    >,
498    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
499}
500
501impl<
502    C: WorkerClassification,
503    M: CmdTunnelMeta,
504    R: ClassifiedCmdTunnelRead<C, M>,
505    W: ClassifiedCmdTunnelWrite<C, M>,
506    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
507    LEN: RawEncode
508        + for<'a> RawDecode<'a>
509        + Copy
510        + Send
511        + Sync
512        + 'static
513        + FromPrimitive
514        + ToPrimitive
515        + RawFixedBytes,
516    CMD: RawEncode
517        + for<'a> RawDecode<'a>
518        + Copy
519        + Send
520        + Sync
521        + 'static
522        + RawFixedBytes
523        + Eq
524        + Hash
525        + Debug,
526    LISTENER: CmdTunnelListener<M, R, W>,
527> DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
528{
529    pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
530        let cmd_handler_map = Arc::new(CmdHandlerMap::new());
531        let handler_map = cmd_handler_map.clone();
532        let resp_waiter = Arc::new(RespWaiter::new());
533        let waiter = resp_waiter.clone();
534        let write_factory = ClassifiedCmdNodeWriteFactory::<C, M, R, W, _, LEN, CMD, LISTENER>::new(
535            factory,
536            listener,
537            move |local_id: PeerId,
538                  peer_id: PeerId,
539                  tunnel_id: TunnelId,
540                  header: CmdHeader<LEN, CMD>,
541                  body_read: CmdBody| {
542                let handler_map = handler_map.clone();
543                let waiter = waiter.clone();
544                async move {
545                    if header.is_resp() && header.seq().is_some() {
546                        let resp_id =
547                            gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
548                        let _ = waiter.set_result(resp_id, body_read);
549                        Ok(None)
550                    } else {
551                        if let Some(handler) = handler_map.get(header.cmd_code()) {
552                            handler
553                                .handle(local_id, peer_id, tunnel_id, header, body_read)
554                                .await
555                        } else {
556                            Ok(None)
557                        }
558                    }
559                }
560            },
561            resp_waiter.clone(),
562        );
563        write_factory.start();
564        Arc::new(Self {
565            tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
566            cmd_handler_map,
567        })
568    }
569
570    async fn get_send(
571        &self,
572        peer_id: PeerId,
573    ) -> CmdResult<
574        ClassifiedWorkerGuard<
575            CmdNodeTunnelClassification<C>,
576            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
577            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
578        >,
579    > {
580        self.tunnel_pool
581            .get_classified_worker(CmdNodeTunnelClassification {
582                peer_id: Some(peer_id),
583                tunnel_id: None,
584                classification: None,
585            })
586            .await
587            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
588    }
589
590    async fn get_send_of_tunnel_id(
591        &self,
592        peer_id: PeerId,
593        tunnel_id: TunnelId,
594    ) -> CmdResult<
595        ClassifiedWorkerGuard<
596            CmdNodeTunnelClassification<C>,
597            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
598            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
599        >,
600    > {
601        self.tunnel_pool
602            .get_classified_worker(CmdNodeTunnelClassification {
603                peer_id: Some(peer_id),
604                tunnel_id: Some(tunnel_id),
605                classification: None,
606            })
607            .await
608            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
609    }
610
611    async fn get_classified_send(
612        &self,
613        classification: C,
614    ) -> CmdResult<
615        ClassifiedWorkerGuard<
616            CmdNodeTunnelClassification<C>,
617            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
618            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
619        >,
620    > {
621        self.tunnel_pool
622            .get_classified_worker(CmdNodeTunnelClassification {
623                peer_id: None,
624                tunnel_id: None,
625                classification: Some(classification),
626            })
627            .await
628            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
629    }
630
631    async fn get_peer_classified_send(
632        &self,
633        peer_id: PeerId,
634        classification: C,
635    ) -> CmdResult<
636        ClassifiedWorkerGuard<
637            CmdNodeTunnelClassification<C>,
638            ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
639            ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
640        >,
641    > {
642        self.tunnel_pool
643            .get_classified_worker(CmdNodeTunnelClassification {
644                peer_id: Some(peer_id),
645                tunnel_id: None,
646                classification: Some(classification),
647            })
648            .await
649            .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
650    }
651}
652
653pub type ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
654    CmdNodeTunnelClassification<C>,
655    M,
656    ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
657    ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
658>;
659#[async_trait::async_trait]
660impl<
661    C: WorkerClassification,
662    M: CmdTunnelMeta,
663    R: ClassifiedCmdTunnelRead<C, M>,
664    W: ClassifiedCmdTunnelWrite<C, M>,
665    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
666    LEN: RawEncode
667        + for<'a> RawDecode<'a>
668        + Copy
669        + Send
670        + Sync
671        + 'static
672        + FromPrimitive
673        + ToPrimitive
674        + RawFixedBytes,
675    CMD: RawEncode
676        + for<'a> RawDecode<'a>
677        + Copy
678        + Send
679        + Sync
680        + 'static
681        + RawFixedBytes
682        + Eq
683        + Hash
684        + Debug,
685    LISTENER: CmdTunnelListener<M, R, W>,
686>
687    CmdNode<
688        LEN,
689        CMD,
690        M,
691        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
692        ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
693    > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
694{
695    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
696        self.cmd_handler_map.insert(cmd, handler)
697    }
698
699    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
700        let mut send = self.get_send(peer_id.clone()).await?;
701        send.send(cmd, version, body).await
702    }
703
704    async fn send_with_resp(
705        &self,
706        peer_id: &PeerId,
707        cmd: CMD,
708        version: u8,
709        body: &[u8],
710        timeout: Duration,
711    ) -> CmdResult<CmdBody> {
712        let mut send = self.get_send(peer_id.clone()).await?;
713        send.send_with_resp(cmd, version, body, timeout).await
714    }
715
716    async fn send_parts(
717        &self,
718        peer_id: &PeerId,
719        cmd: CMD,
720        version: u8,
721        body: &[&[u8]],
722    ) -> CmdResult<()> {
723        let mut send = self.get_send(peer_id.clone()).await?;
724        send.send_parts(cmd, version, body).await
725    }
726
727    async fn send_parts_with_resp(
728        &self,
729        peer_id: &PeerId,
730        cmd: CMD,
731        version: u8,
732        body: &[&[u8]],
733        timeout: Duration,
734    ) -> CmdResult<CmdBody> {
735        let mut send = self.get_send(peer_id.clone()).await?;
736        send.send_parts_with_resp(cmd, version, body, timeout).await
737    }
738
739    async fn send_cmd(
740        &self,
741        peer_id: &PeerId,
742        cmd: CMD,
743        version: u8,
744        body: CmdBody,
745    ) -> CmdResult<()> {
746        let mut send = self.get_send(peer_id.clone()).await?;
747        send.send_cmd(cmd, version, body).await
748    }
749
750    async fn send_cmd_with_resp(
751        &self,
752        peer_id: &PeerId,
753        cmd: CMD,
754        version: u8,
755        body: CmdBody,
756        timeout: Duration,
757    ) -> CmdResult<CmdBody> {
758        let mut send = self.get_send(peer_id.clone()).await?;
759        send.send_cmd_with_resp(cmd, version, body, timeout).await
760    }
761
762    async fn send_by_specify_tunnel(
763        &self,
764        peer_id: &PeerId,
765        tunnel_id: TunnelId,
766        cmd: CMD,
767        version: u8,
768        body: &[u8],
769    ) -> CmdResult<()> {
770        let mut send = self
771            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
772            .await?;
773        send.send(cmd, version, body).await
774    }
775
776    async fn send_by_specify_tunnel_with_resp(
777        &self,
778        peer_id: &PeerId,
779        tunnel_id: TunnelId,
780        cmd: CMD,
781        version: u8,
782        body: &[u8],
783        timeout: Duration,
784    ) -> CmdResult<CmdBody> {
785        let mut send = self
786            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
787            .await?;
788        send.send_with_resp(cmd, version, body, timeout).await
789    }
790
791    async fn send_parts_by_specify_tunnel(
792        &self,
793        peer_id: &PeerId,
794        tunnel_id: TunnelId,
795        cmd: CMD,
796        version: u8,
797        body: &[&[u8]],
798    ) -> CmdResult<()> {
799        let mut send = self
800            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
801            .await?;
802        send.send_parts(cmd, version, body).await
803    }
804
805    async fn send_parts_by_specify_tunnel_with_resp(
806        &self,
807        peer_id: &PeerId,
808        tunnel_id: TunnelId,
809        cmd: CMD,
810        version: u8,
811        body: &[&[u8]],
812        timeout: Duration,
813    ) -> CmdResult<CmdBody> {
814        let mut send = self
815            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
816            .await?;
817        send.send_parts_with_resp(cmd, version, body, timeout).await
818    }
819
820    async fn send_cmd_by_specify_tunnel(
821        &self,
822        peer_id: &PeerId,
823        tunnel_id: TunnelId,
824        cmd: CMD,
825        version: u8,
826        body: CmdBody,
827    ) -> CmdResult<()> {
828        let mut send = self
829            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
830            .await?;
831        send.send_cmd(cmd, version, body).await
832    }
833
834    async fn send_cmd_by_specify_tunnel_with_resp(
835        &self,
836        peer_id: &PeerId,
837        tunnel_id: TunnelId,
838        cmd: CMD,
839        version: u8,
840        body: CmdBody,
841        timeout: Duration,
842    ) -> CmdResult<CmdBody> {
843        let mut send = self
844            .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
845            .await?;
846        send.send_cmd_with_resp(cmd, version, body, timeout).await
847    }
848
849    async fn clear_all_tunnel(&self) {
850        self.tunnel_pool.clear_all_worker().await;
851    }
852
853    async fn get_send(
854        &self,
855        peer_id: &PeerId,
856        tunnel_id: TunnelId,
857    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
858        Ok(ClassifiedSendGuard {
859            worker_guard: self
860                .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
861                .await?,
862            _p: Default::default(),
863        })
864    }
865}
866
867#[async_trait::async_trait]
868impl<
869    C: WorkerClassification,
870    M: CmdTunnelMeta,
871    R: ClassifiedCmdTunnelRead<C, M>,
872    W: ClassifiedCmdTunnelWrite<C, M>,
873    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
874    LEN: RawEncode
875        + for<'a> RawDecode<'a>
876        + Copy
877        + Send
878        + Sync
879        + 'static
880        + FromPrimitive
881        + ToPrimitive
882        + RawFixedBytes,
883    CMD: RawEncode
884        + for<'a> RawDecode<'a>
885        + Copy
886        + Send
887        + Sync
888        + 'static
889        + RawFixedBytes
890        + Eq
891        + Hash
892        + Debug,
893    LISTENER: CmdTunnelListener<M, R, W>,
894>
895    ClassifiedCmdNode<
896        LEN,
897        CMD,
898        C,
899        M,
900        ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
901        ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
902    > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
903{
904    async fn send_by_classified_tunnel(
905        &self,
906        classification: C,
907        cmd: CMD,
908        version: u8,
909        body: &[u8],
910    ) -> CmdResult<()> {
911        let mut send = self.get_classified_send(classification).await?;
912        send.send(cmd, version, body).await
913    }
914
915    async fn send_by_classified_tunnel_with_resp(
916        &self,
917        classification: C,
918        cmd: CMD,
919        version: u8,
920        body: &[u8],
921        timeout: Duration,
922    ) -> CmdResult<CmdBody> {
923        let mut send = self.get_classified_send(classification).await?;
924        send.send_with_resp(cmd, version, body, timeout).await
925    }
926
927    async fn send_parts_by_classified_tunnel(
928        &self,
929        classification: C,
930        cmd: CMD,
931        version: u8,
932        body: &[&[u8]],
933    ) -> CmdResult<()> {
934        let mut send = self.get_classified_send(classification).await?;
935        send.send_parts(cmd, version, body).await
936    }
937
938    async fn send_parts_by_classified_tunnel_with_resp(
939        &self,
940        classification: C,
941        cmd: CMD,
942        version: u8,
943        body: &[&[u8]],
944        timeout: Duration,
945    ) -> CmdResult<CmdBody> {
946        let mut send = self.get_classified_send(classification).await?;
947        send.send_parts_with_resp(cmd, version, body, timeout).await
948    }
949
950    async fn send_cmd_by_classified_tunnel(
951        &self,
952        classification: C,
953        cmd: CMD,
954        version: u8,
955        body: CmdBody,
956    ) -> CmdResult<()> {
957        let mut send = self.get_classified_send(classification).await?;
958        send.send_cmd(cmd, version, body).await
959    }
960
961    async fn send_cmd_by_classified_tunnel_with_resp(
962        &self,
963        classification: C,
964        cmd: CMD,
965        version: u8,
966        body: CmdBody,
967        timeout: Duration,
968    ) -> CmdResult<CmdBody> {
969        let mut send = self.get_classified_send(classification).await?;
970        send.send_cmd_with_resp(cmd, version, body, timeout).await
971    }
972
973    async fn send_by_peer_classified_tunnel(
974        &self,
975        peer_id: &PeerId,
976        classification: C,
977        cmd: CMD,
978        version: u8,
979        body: &[u8],
980    ) -> CmdResult<()> {
981        let mut send = self
982            .get_peer_classified_send(peer_id.clone(), classification)
983            .await?;
984        send.send(cmd, version, body).await
985    }
986
987    async fn send_by_peer_classified_tunnel_with_resp(
988        &self,
989        peer_id: &PeerId,
990        classification: C,
991        cmd: CMD,
992        version: u8,
993        body: &[u8],
994        timeout: Duration,
995    ) -> CmdResult<CmdBody> {
996        let mut send = self
997            .get_peer_classified_send(peer_id.clone(), classification)
998            .await?;
999        send.send_with_resp(cmd, version, body, timeout).await
1000    }
1001
1002    async fn send_parts_by_peer_classified_tunnel(
1003        &self,
1004        peer_id: &PeerId,
1005        classification: C,
1006        cmd: CMD,
1007        version: u8,
1008        body: &[&[u8]],
1009    ) -> CmdResult<()> {
1010        let mut send = self
1011            .get_peer_classified_send(peer_id.clone(), classification)
1012            .await?;
1013        send.send_parts(cmd, version, body).await
1014    }
1015
1016    async fn send_parts_by_peer_classified_tunnel_with_resp(
1017        &self,
1018        peer_id: &PeerId,
1019        classification: C,
1020        cmd: CMD,
1021        version: u8,
1022        body: &[&[u8]],
1023        timeout: Duration,
1024    ) -> CmdResult<CmdBody> {
1025        let mut send = self
1026            .get_peer_classified_send(peer_id.clone(), classification)
1027            .await?;
1028        send.send_parts_with_resp(cmd, version, body, timeout).await
1029    }
1030
1031    async fn send_cmd_by_peer_classified_tunnel(
1032        &self,
1033        peer_id: &PeerId,
1034        classification: C,
1035        cmd: CMD,
1036        version: u8,
1037        body: CmdBody,
1038    ) -> CmdResult<()> {
1039        let mut send = self
1040            .get_peer_classified_send(peer_id.clone(), classification)
1041            .await?;
1042        send.send_cmd(cmd, version, body).await
1043    }
1044
1045    async fn send_cmd_by_peer_classified_tunnel_with_resp(
1046        &self,
1047        peer_id: &PeerId,
1048        classification: C,
1049        cmd: CMD,
1050        version: u8,
1051        body: CmdBody,
1052        timeout: Duration,
1053    ) -> CmdResult<CmdBody> {
1054        let mut send = self
1055            .get_peer_classified_send(peer_id.clone(), classification)
1056            .await?;
1057        send.send_cmd_with_resp(cmd, version, body, timeout).await
1058    }
1059
1060    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId> {
1061        let send = self.get_classified_send(classification).await?;
1062        Ok(send.get_tunnel_id())
1063    }
1064
1065    async fn find_tunnel_id_by_peer_classified(
1066        &self,
1067        peer_id: &PeerId,
1068        classification: C,
1069    ) -> CmdResult<TunnelId> {
1070        let send = self
1071            .get_peer_classified_send(peer_id.clone(), classification)
1072            .await?;
1073        Ok(send.get_tunnel_id())
1074    }
1075
1076    async fn get_send_by_classified(
1077        &self,
1078        classification: C,
1079    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1080        Ok(ClassifiedSendGuard {
1081            worker_guard: self.get_classified_send(classification).await?,
1082            _p: Default::default(),
1083        })
1084    }
1085
1086    async fn get_send_by_peer_classified(
1087        &self,
1088        peer_id: &PeerId,
1089        classification: C,
1090    ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1091        Ok(ClassifiedSendGuard {
1092            worker_guard: self
1093                .get_peer_classified_send(peer_id.clone(), classification)
1094                .await?,
1095            _p: Default::default(),
1096        })
1097    }
1098}