1use crate::client::{
2 ClassifiedCmdSend, ClassifiedCmdTunnelRead, ClassifiedCmdTunnelWrite, ClassifiedSendGuard,
3 RespWaiter, RespWaiterRef, gen_resp_id,
4};
5use crate::cmd::CmdHandlerMap;
6use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
7use crate::node::create_recv_handle;
8use crate::server::CmdTunnelListener;
9use crate::{
10 ClassifiedCmdNode, CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, PeerId, TunnelId,
11 TunnelIdGenerator, into_pool_err, pool_err,
12};
13use async_named_locker::ObjectHolder;
14use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
15use num::{FromPrimitive, ToPrimitive};
16use sfo_pool::{
17 ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
18 ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification,
19};
20use sfo_split::Splittable;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27#[derive(Debug, Clone, Eq, Hash)]
28pub struct CmdNodeTunnelClassification<C: WorkerClassification> {
29 pub peer_id: Option<PeerId>,
30 pub tunnel_id: Option<TunnelId>,
31 pub classification: Option<C>,
32}
33
34impl<C: WorkerClassification> PartialEq for CmdNodeTunnelClassification<C> {
35 fn eq(&self, other: &Self) -> bool {
36 self.peer_id == other.peer_id
37 && self.tunnel_id == other.tunnel_id
38 && self.classification == other.classification
39 }
40}
41
42impl<C, M, R, W, LEN, CMD> ClassifiedWorker<CmdNodeTunnelClassification<C>>
43 for ClassifiedCmdSend<C, M, R, W, LEN, CMD>
44where
45 C: WorkerClassification,
46 M: CmdTunnelMeta,
47 R: ClassifiedCmdTunnelRead<C, M>,
48 W: ClassifiedCmdTunnelWrite<C, M>,
49 LEN: RawEncode
50 + for<'a> RawDecode<'a>
51 + Copy
52 + Send
53 + Sync
54 + 'static
55 + FromPrimitive
56 + ToPrimitive,
57 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
58{
59 fn is_work(&self) -> bool {
60 self.is_work && !self.recv_handle.is_finished()
61 }
62
63 fn is_valid(&self, c: CmdNodeTunnelClassification<C>) -> bool {
64 if c.peer_id.is_some() && c.peer_id.as_ref().unwrap() != &self.remote_id {
65 return false;
66 }
67
68 if c.tunnel_id.is_some() {
69 self.tunnel_id == c.tunnel_id.unwrap()
70 } else {
71 if c.classification.is_some() {
72 self.classification == c.classification.unwrap()
73 } else {
74 true
75 }
76 }
77 }
78
79 fn classification(&self) -> CmdNodeTunnelClassification<C> {
80 CmdNodeTunnelClassification {
81 peer_id: Some(self.remote_id.clone()),
82 tunnel_id: Some(self.tunnel_id),
83 classification: Some(self.classification.clone()),
84 }
85 }
86}
87
88#[async_trait::async_trait]
89pub trait ClassifiedCmdNodeTunnelFactory<
90 C: WorkerClassification,
91 M: CmdTunnelMeta,
92 R: ClassifiedCmdTunnelRead<C, M>,
93 W: ClassifiedCmdTunnelWrite<C, M>,
94>: Send + Sync + 'static
95{
96 async fn create_tunnel(
97 &self,
98 classification: Option<CmdNodeTunnelClassification<C>>,
99 ) -> CmdResult<Splittable<R, W>>;
100}
101
102struct CmdWriteFactoryImpl<
103 C: WorkerClassification,
104 M: CmdTunnelMeta,
105 R: ClassifiedCmdTunnelRead<C, M>,
106 W: ClassifiedCmdTunnelWrite<C, M>,
107 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
108 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
109 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
110 LISTENER: CmdTunnelListener<M, R, W>,
111> {
112 tunnel_listener: LISTENER,
113 tunnel_factory: F,
114 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
115 tunnel_id_generator: TunnelIdGenerator,
116 resp_waiter: RespWaiterRef,
117 send_cache: Arc<Mutex<HashMap<PeerId, Vec<ClassifiedCmdSend<C, M, R, W, LEN, CMD>>>>>,
118}
119
120impl<
121 C: WorkerClassification,
122 M: CmdTunnelMeta,
123 R: ClassifiedCmdTunnelRead<C, M>,
124 W: ClassifiedCmdTunnelWrite<C, M>,
125 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
126 LEN: RawEncode
127 + for<'a> RawDecode<'a>
128 + Copy
129 + Send
130 + Sync
131 + 'static
132 + FromPrimitive
133 + ToPrimitive
134 + RawFixedBytes,
135 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
136 LISTENER: CmdTunnelListener<M, R, W>,
137> CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
138{
139 pub fn new(
140 tunnel_factory: F,
141 tunnel_listener: LISTENER,
142 cmd_handler: impl CmdHandler<LEN, CMD>,
143 resp_waiter: RespWaiterRef,
144 ) -> Self {
145 Self {
146 tunnel_listener,
147 tunnel_factory,
148 cmd_handler: Arc::new(cmd_handler),
149 tunnel_id_generator: TunnelIdGenerator::new(),
150 resp_waiter,
151 send_cache: Arc::new(Mutex::new(Default::default())),
152 }
153 }
154
155 pub fn start(self: &Arc<Self>) {
156 let this = self.clone();
157 tokio::spawn(async move {
158 if let Err(e) = this.run().await {
159 log::error!("cmd server error: {:?}", e);
160 }
161 });
162 }
163
164 async fn run(self: &Arc<Self>) -> CmdResult<()> {
165 loop {
166 let tunnel = self.tunnel_listener.accept().await?;
167 let peer_id = tunnel.get_remote_peer_id();
168 let classification = tunnel.get_classification();
169 let tunnel_id = self.tunnel_id_generator.generate();
170 let this = self.clone();
171 let resp_waiter = self.resp_waiter.clone();
172 tokio::spawn(async move {
173 let ret: CmdResult<()> = async move {
174 let this = this.clone();
175 let cmd_handler = this.cmd_handler.clone();
176 let (reader, writer) = tunnel.split();
177 let tunnel_meta = reader.get_tunnel_meta();
178 let remote_id = writer.get_remote_peer_id();
179 let writer = ObjectHolder::new(writer);
180 let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
181 reader,
182 writer.clone(),
183 tunnel_id,
184 cmd_handler,
185 );
186 {
187 let mut send_cache = this.send_cache.lock().unwrap();
188 let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
189 send_list.push(ClassifiedCmdSend::new(
190 tunnel_id,
191 classification,
192 recv_handle,
193 writer,
194 resp_waiter,
195 remote_id,
196 tunnel_meta,
197 ));
198 }
199 Ok(())
200 }
201 .await;
202 if let Err(e) = ret {
203 log::error!("peer connection error: {:?}", e);
204 }
205 });
206 }
207 }
208}
209
210#[async_trait::async_trait]
211impl<
212 C: WorkerClassification,
213 M: CmdTunnelMeta,
214 R: ClassifiedCmdTunnelRead<C, M>,
215 W: ClassifiedCmdTunnelWrite<C, M>,
216 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
217 LEN: RawEncode
218 + for<'a> RawDecode<'a>
219 + Copy
220 + Send
221 + Sync
222 + 'static
223 + FromPrimitive
224 + ToPrimitive
225 + RawFixedBytes,
226 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
227 LISTENER: CmdTunnelListener<M, R, W>,
228> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
229 for CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
230{
231 async fn create(
232 &self,
233 c: Option<CmdNodeTunnelClassification<C>>,
234 ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
235 if c.is_some() {
236 let classification = c.unwrap();
237 if classification.peer_id.is_some() {
238 let peer_id = classification.peer_id.clone().unwrap();
239 let tunnel_id = classification.tunnel_id;
240 if tunnel_id.is_some() {
241 let mut send_cache = self.send_cache.lock().unwrap();
242 if let Some(send_list) = send_cache.get_mut(&peer_id) {
243 let mut send_index = None;
244 for (index, send) in send_list.iter().enumerate() {
245 if send.get_tunnel_id() == tunnel_id.unwrap() {
246 send_index = Some(index);
247 break;
248 }
249 }
250 if let Some(send_index) = send_index {
251 let send = send_list.remove(send_index);
252 Ok(send)
253 } else {
254 Err(pool_err!(
255 PoolErrorCode::Failed,
256 "tunnel {:?} not found",
257 tunnel_id.unwrap()
258 ))
259 }
260 } else {
261 Err(pool_err!(
262 PoolErrorCode::Failed,
263 "tunnel {:?} not found",
264 tunnel_id.unwrap()
265 ))
266 }
267 } else {
268 {
269 let mut send_cache = self.send_cache.lock().unwrap();
270 if let Some(send_list) = send_cache.get_mut(&peer_id) {
271 if !send_list.is_empty() {
272 let send = send_list.pop().unwrap();
273 if send_list.is_empty() {
274 send_cache.remove(&peer_id);
275 }
276 return Ok(send);
277 }
278 }
279 }
280 let tunnel = self
281 .tunnel_factory
282 .create_tunnel(Some(classification))
283 .await
284 .map_err(into_pool_err!(PoolErrorCode::Failed))?;
285 let classification = tunnel.get_classification();
286 let tunnel_id = self.tunnel_id_generator.generate();
287 let (recv, write) = tunnel.split();
288 let remote_id = write.get_remote_peer_id();
289 let tunnel_meta = recv.get_tunnel_meta();
290 let write = ObjectHolder::new(write);
291 let cmd_handler = self.cmd_handler.clone();
292 let handle = create_recv_handle::<M, R, W, LEN, CMD>(
293 recv,
294 write.clone(),
295 tunnel_id,
296 cmd_handler,
297 );
298 Ok(ClassifiedCmdSend::new(
299 tunnel_id,
300 classification,
301 handle,
302 write,
303 self.resp_waiter.clone(),
304 remote_id,
305 tunnel_meta,
306 ))
307 }
308 } else {
309 if classification.tunnel_id.is_some() {
310 Err(pool_err!(
311 PoolErrorCode::Failed,
312 "must set peer id when set tunnel id"
313 ))
314 } else {
315 let tunnel = self
316 .tunnel_factory
317 .create_tunnel(Some(classification))
318 .await
319 .map_err(into_pool_err!(PoolErrorCode::Failed))?;
320 let classification = tunnel.get_classification();
321 let tunnel_id = self.tunnel_id_generator.generate();
322 let (recv, write) = tunnel.split();
323 let remote_id = write.get_remote_peer_id();
324 let tunnel_meta = write.get_tunnel_meta();
325 let write = ObjectHolder::new(write);
326 let cmd_handler = self.cmd_handler.clone();
327 let handle = create_recv_handle::<M, R, W, LEN, CMD>(
328 recv,
329 write.clone(),
330 tunnel_id,
331 cmd_handler,
332 );
333 Ok(ClassifiedCmdSend::new(
334 tunnel_id,
335 classification,
336 handle,
337 write,
338 self.resp_waiter.clone(),
339 remote_id,
340 tunnel_meta,
341 ))
342 }
343 }
344 } else {
345 Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
346 }
347 }
348}
349
350pub struct ClassifiedCmdNodeWriteFactory<
351 C: WorkerClassification,
352 M: CmdTunnelMeta,
353 R: ClassifiedCmdTunnelRead<C, M>,
354 W: ClassifiedCmdTunnelWrite<C, M>,
355 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
356 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
357 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
358 LISTENER: CmdTunnelListener<M, R, W>,
359> {
360 inner: Arc<CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>>,
361}
362
363impl<
364 C: WorkerClassification,
365 M: CmdTunnelMeta,
366 R: ClassifiedCmdTunnelRead<C, M>,
367 W: ClassifiedCmdTunnelWrite<C, M>,
368 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
369 LEN: RawEncode
370 + for<'a> RawDecode<'a>
371 + Copy
372 + Send
373 + Sync
374 + 'static
375 + FromPrimitive
376 + ToPrimitive
377 + RawFixedBytes
378 + RawFixedBytes,
379 CMD: RawEncode
380 + for<'a> RawDecode<'a>
381 + Copy
382 + Send
383 + Sync
384 + 'static
385 + Debug
386 + RawFixedBytes
387 + RawFixedBytes,
388 LISTENER: CmdTunnelListener<M, R, W>,
389> ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
390{
391 pub(crate) fn new(
392 tunnel_factory: F,
393 tunnel_listener: LISTENER,
394 cmd_handler: impl CmdHandler<LEN, CMD>,
395 resp_waiter: RespWaiterRef,
396 ) -> Self {
397 Self {
398 inner: Arc::new(CmdWriteFactoryImpl::new(
399 tunnel_factory,
400 tunnel_listener,
401 cmd_handler,
402 resp_waiter,
403 )),
404 }
405 }
406
407 pub fn start(&self) {
408 self.inner.start();
409 }
410}
411
412#[async_trait::async_trait]
413impl<
414 C: WorkerClassification,
415 M: CmdTunnelMeta,
416 R: ClassifiedCmdTunnelRead<C, M>,
417 W: ClassifiedCmdTunnelWrite<C, M>,
418 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
419 LEN: RawEncode
420 + for<'a> RawDecode<'a>
421 + Copy
422 + Send
423 + Sync
424 + 'static
425 + FromPrimitive
426 + ToPrimitive
427 + RawFixedBytes,
428 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
429 LISTENER: CmdTunnelListener<M, R, W>,
430> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
431 for ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
432{
433 async fn create(
434 &self,
435 c: Option<CmdNodeTunnelClassification<C>>,
436 ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
437 self.inner.create(c).await
438 }
439}
440
441pub struct DefaultClassifiedCmdNode<
442 C: WorkerClassification,
443 M: CmdTunnelMeta,
444 R: ClassifiedCmdTunnelRead<C, M>,
445 W: ClassifiedCmdTunnelWrite<C, M>,
446 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
447 LEN: RawEncode
448 + for<'a> RawDecode<'a>
449 + Copy
450 + Send
451 + Sync
452 + 'static
453 + FromPrimitive
454 + ToPrimitive
455 + RawFixedBytes,
456 CMD: RawEncode
457 + for<'a> RawDecode<'a>
458 + Copy
459 + Send
460 + Sync
461 + 'static
462 + RawFixedBytes
463 + Eq
464 + Hash
465 + Debug,
466 LISTENER: CmdTunnelListener<M, R, W>,
467> {
468 tunnel_pool: ClassifiedWorkerPoolRef<
469 CmdNodeTunnelClassification<C>,
470 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
471 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
472 >,
473 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
474}
475
476impl<
477 C: WorkerClassification,
478 M: CmdTunnelMeta,
479 R: ClassifiedCmdTunnelRead<C, M>,
480 W: ClassifiedCmdTunnelWrite<C, M>,
481 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
482 LEN: RawEncode
483 + for<'a> RawDecode<'a>
484 + Copy
485 + Send
486 + Sync
487 + 'static
488 + FromPrimitive
489 + ToPrimitive
490 + RawFixedBytes,
491 CMD: RawEncode
492 + for<'a> RawDecode<'a>
493 + Copy
494 + Send
495 + Sync
496 + 'static
497 + RawFixedBytes
498 + Eq
499 + Hash
500 + Debug,
501 LISTENER: CmdTunnelListener<M, R, W>,
502> DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
503{
504 pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
505 let cmd_handler_map = Arc::new(CmdHandlerMap::new());
506 let handler_map = cmd_handler_map.clone();
507 let resp_waiter = Arc::new(RespWaiter::new());
508 let waiter = resp_waiter.clone();
509 let write_factory = ClassifiedCmdNodeWriteFactory::<C, M, R, W, _, LEN, CMD, LISTENER>::new(
510 factory,
511 listener,
512 move |local_id: PeerId,
513 peer_id: PeerId,
514 tunnel_id: TunnelId,
515 header: CmdHeader<LEN, CMD>,
516 body_read: CmdBody| {
517 let handler_map = handler_map.clone();
518 let waiter = waiter.clone();
519 async move {
520 if header.is_resp() && header.seq().is_some() {
521 let resp_id =
522 gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
523 let _ = waiter.set_result(resp_id, body_read);
524 Ok(None)
525 } else {
526 if let Some(handler) = handler_map.get(header.cmd_code()) {
527 handler
528 .handle(local_id, peer_id, tunnel_id, header, body_read)
529 .await
530 } else {
531 Ok(None)
532 }
533 }
534 }
535 },
536 resp_waiter.clone(),
537 );
538 write_factory.start();
539 Arc::new(Self {
540 tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
541 cmd_handler_map,
542 })
543 }
544
545 async fn get_send(
546 &self,
547 peer_id: PeerId,
548 ) -> CmdResult<
549 ClassifiedWorkerGuard<
550 CmdNodeTunnelClassification<C>,
551 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
552 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
553 >,
554 > {
555 self.tunnel_pool
556 .get_classified_worker(CmdNodeTunnelClassification {
557 peer_id: Some(peer_id),
558 tunnel_id: None,
559 classification: None,
560 })
561 .await
562 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
563 }
564
565 async fn get_send_of_tunnel_id(
566 &self,
567 peer_id: PeerId,
568 tunnel_id: TunnelId,
569 ) -> CmdResult<
570 ClassifiedWorkerGuard<
571 CmdNodeTunnelClassification<C>,
572 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
573 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
574 >,
575 > {
576 self.tunnel_pool
577 .get_classified_worker(CmdNodeTunnelClassification {
578 peer_id: Some(peer_id),
579 tunnel_id: Some(tunnel_id),
580 classification: None,
581 })
582 .await
583 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
584 }
585
586 async fn get_classified_send(
587 &self,
588 classification: C,
589 ) -> CmdResult<
590 ClassifiedWorkerGuard<
591 CmdNodeTunnelClassification<C>,
592 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
593 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
594 >,
595 > {
596 self.tunnel_pool
597 .get_classified_worker(CmdNodeTunnelClassification {
598 peer_id: None,
599 tunnel_id: None,
600 classification: Some(classification),
601 })
602 .await
603 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
604 }
605
606 async fn get_peer_classified_send(
607 &self,
608 peer_id: PeerId,
609 classification: C,
610 ) -> CmdResult<
611 ClassifiedWorkerGuard<
612 CmdNodeTunnelClassification<C>,
613 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
614 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
615 >,
616 > {
617 self.tunnel_pool
618 .get_classified_worker(CmdNodeTunnelClassification {
619 peer_id: Some(peer_id),
620 tunnel_id: None,
621 classification: Some(classification),
622 })
623 .await
624 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
625 }
626}
627
628pub type ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
629 CmdNodeTunnelClassification<C>,
630 M,
631 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
632 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
633>;
634#[async_trait::async_trait]
635impl<
636 C: WorkerClassification,
637 M: CmdTunnelMeta,
638 R: ClassifiedCmdTunnelRead<C, M>,
639 W: ClassifiedCmdTunnelWrite<C, M>,
640 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
641 LEN: RawEncode
642 + for<'a> RawDecode<'a>
643 + Copy
644 + Send
645 + Sync
646 + 'static
647 + FromPrimitive
648 + ToPrimitive
649 + RawFixedBytes,
650 CMD: RawEncode
651 + for<'a> RawDecode<'a>
652 + Copy
653 + Send
654 + Sync
655 + 'static
656 + RawFixedBytes
657 + Eq
658 + Hash
659 + Debug,
660 LISTENER: CmdTunnelListener<M, R, W>,
661>
662 CmdNode<
663 LEN,
664 CMD,
665 M,
666 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
667 ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
668 > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
669{
670 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
671 self.cmd_handler_map.insert(cmd, handler)
672 }
673
674 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
675 let mut send = self.get_send(peer_id.clone()).await?;
676 send.send(cmd, version, body).await
677 }
678
679 async fn send_with_resp(
680 &self,
681 peer_id: &PeerId,
682 cmd: CMD,
683 version: u8,
684 body: &[u8],
685 timeout: Duration,
686 ) -> CmdResult<CmdBody> {
687 let mut send = self.get_send(peer_id.clone()).await?;
688 send.send_with_resp(cmd, version, body, timeout).await
689 }
690
691 async fn send2(
692 &self,
693 peer_id: &PeerId,
694 cmd: CMD,
695 version: u8,
696 body: &[&[u8]],
697 ) -> CmdResult<()> {
698 let mut send = self.get_send(peer_id.clone()).await?;
699 send.send2(cmd, version, body).await
700 }
701
702 async fn send2_with_resp(
703 &self,
704 peer_id: &PeerId,
705 cmd: CMD,
706 version: u8,
707 body: &[&[u8]],
708 timeout: Duration,
709 ) -> CmdResult<CmdBody> {
710 let mut send = self.get_send(peer_id.clone()).await?;
711 send.send2_with_resp(cmd, version, body, timeout).await
712 }
713
714 async fn send_cmd(
715 &self,
716 peer_id: &PeerId,
717 cmd: CMD,
718 version: u8,
719 body: CmdBody,
720 ) -> CmdResult<()> {
721 let mut send = self.get_send(peer_id.clone()).await?;
722 send.send_cmd(cmd, version, body).await
723 }
724
725 async fn send_cmd_with_resp(
726 &self,
727 peer_id: &PeerId,
728 cmd: CMD,
729 version: u8,
730 body: CmdBody,
731 timeout: Duration,
732 ) -> CmdResult<CmdBody> {
733 let mut send = self.get_send(peer_id.clone()).await?;
734 send.send_cmd_with_resp(cmd, version, body, timeout).await
735 }
736
737 async fn send_by_specify_tunnel(
738 &self,
739 peer_id: &PeerId,
740 tunnel_id: TunnelId,
741 cmd: CMD,
742 version: u8,
743 body: &[u8],
744 ) -> CmdResult<()> {
745 let mut send = self
746 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
747 .await?;
748 send.send(cmd, version, body).await
749 }
750
751 async fn send_by_specify_tunnel_with_resp(
752 &self,
753 peer_id: &PeerId,
754 tunnel_id: TunnelId,
755 cmd: CMD,
756 version: u8,
757 body: &[u8],
758 timeout: Duration,
759 ) -> CmdResult<CmdBody> {
760 let mut send = self
761 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
762 .await?;
763 send.send_with_resp(cmd, version, body, timeout).await
764 }
765
766 async fn send2_by_specify_tunnel(
767 &self,
768 peer_id: &PeerId,
769 tunnel_id: TunnelId,
770 cmd: CMD,
771 version: u8,
772 body: &[&[u8]],
773 ) -> CmdResult<()> {
774 let mut send = self
775 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
776 .await?;
777 send.send2(cmd, version, body).await
778 }
779
780 async fn send2_by_specify_tunnel_with_resp(
781 &self,
782 peer_id: &PeerId,
783 tunnel_id: TunnelId,
784 cmd: CMD,
785 version: u8,
786 body: &[&[u8]],
787 timeout: Duration,
788 ) -> CmdResult<CmdBody> {
789 let mut send = self
790 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
791 .await?;
792 send.send2_with_resp(cmd, version, body, timeout).await
793 }
794
795 async fn send_cmd_by_specify_tunnel(
796 &self,
797 peer_id: &PeerId,
798 tunnel_id: TunnelId,
799 cmd: CMD,
800 version: u8,
801 body: CmdBody,
802 ) -> CmdResult<()> {
803 let mut send = self
804 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
805 .await?;
806 send.send_cmd(cmd, version, body).await
807 }
808
809 async fn send_cmd_by_specify_tunnel_with_resp(
810 &self,
811 peer_id: &PeerId,
812 tunnel_id: TunnelId,
813 cmd: CMD,
814 version: u8,
815 body: CmdBody,
816 timeout: Duration,
817 ) -> CmdResult<CmdBody> {
818 let mut send = self
819 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
820 .await?;
821 send.send_cmd_with_resp(cmd, version, body, timeout).await
822 }
823
824 async fn clear_all_tunnel(&self) {
825 self.tunnel_pool.clear_all_worker().await;
826 }
827
828 async fn get_send(
829 &self,
830 peer_id: &PeerId,
831 tunnel_id: TunnelId,
832 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
833 Ok(ClassifiedSendGuard {
834 worker_guard: self
835 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
836 .await?,
837 _p: Default::default(),
838 })
839 }
840}
841
842#[async_trait::async_trait]
843impl<
844 C: WorkerClassification,
845 M: CmdTunnelMeta,
846 R: ClassifiedCmdTunnelRead<C, M>,
847 W: ClassifiedCmdTunnelWrite<C, M>,
848 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
849 LEN: RawEncode
850 + for<'a> RawDecode<'a>
851 + Copy
852 + Send
853 + Sync
854 + 'static
855 + FromPrimitive
856 + ToPrimitive
857 + RawFixedBytes,
858 CMD: RawEncode
859 + for<'a> RawDecode<'a>
860 + Copy
861 + Send
862 + Sync
863 + 'static
864 + RawFixedBytes
865 + Eq
866 + Hash
867 + Debug,
868 LISTENER: CmdTunnelListener<M, R, W>,
869>
870 ClassifiedCmdNode<
871 LEN,
872 CMD,
873 C,
874 M,
875 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
876 ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
877 > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
878{
879 async fn send_by_classified_tunnel(
880 &self,
881 classification: C,
882 cmd: CMD,
883 version: u8,
884 body: &[u8],
885 ) -> CmdResult<()> {
886 let mut send = self.get_classified_send(classification).await?;
887 send.send(cmd, version, body).await
888 }
889
890 async fn send_by_classified_tunnel_with_resp(
891 &self,
892 classification: C,
893 cmd: CMD,
894 version: u8,
895 body: &[u8],
896 timeout: Duration,
897 ) -> CmdResult<CmdBody> {
898 let mut send = self.get_classified_send(classification).await?;
899 send.send_with_resp(cmd, version, body, timeout).await
900 }
901
902 async fn send2_by_classified_tunnel(
903 &self,
904 classification: C,
905 cmd: CMD,
906 version: u8,
907 body: &[&[u8]],
908 ) -> CmdResult<()> {
909 let mut send = self.get_classified_send(classification).await?;
910 send.send2(cmd, version, body).await
911 }
912
913 async fn send2_by_classified_tunnel_with_resp(
914 &self,
915 classification: C,
916 cmd: CMD,
917 version: u8,
918 body: &[&[u8]],
919 timeout: Duration,
920 ) -> CmdResult<CmdBody> {
921 let mut send = self.get_classified_send(classification).await?;
922 send.send2_with_resp(cmd, version, body, timeout).await
923 }
924
925 async fn send_cmd_by_classified_tunnel(
926 &self,
927 classification: C,
928 cmd: CMD,
929 version: u8,
930 body: CmdBody,
931 ) -> CmdResult<()> {
932 let mut send = self.get_classified_send(classification).await?;
933 send.send_cmd(cmd, version, body).await
934 }
935
936 async fn send_cmd_by_classified_tunnel_with_resp(
937 &self,
938 classification: C,
939 cmd: CMD,
940 version: u8,
941 body: CmdBody,
942 timeout: Duration,
943 ) -> CmdResult<CmdBody> {
944 let mut send = self.get_classified_send(classification).await?;
945 send.send_cmd_with_resp(cmd, version, body, timeout).await
946 }
947
948 async fn send_by_peer_classified_tunnel(
949 &self,
950 peer_id: &PeerId,
951 classification: C,
952 cmd: CMD,
953 version: u8,
954 body: &[u8],
955 ) -> CmdResult<()> {
956 let mut send = self
957 .get_peer_classified_send(peer_id.clone(), classification)
958 .await?;
959 send.send(cmd, version, body).await
960 }
961
962 async fn send_by_peer_classified_tunnel_with_resp(
963 &self,
964 peer_id: &PeerId,
965 classification: C,
966 cmd: CMD,
967 version: u8,
968 body: &[u8],
969 timeout: Duration,
970 ) -> CmdResult<CmdBody> {
971 let mut send = self
972 .get_peer_classified_send(peer_id.clone(), classification)
973 .await?;
974 send.send_with_resp(cmd, version, body, timeout).await
975 }
976
977 async fn send2_by_peer_classified_tunnel(
978 &self,
979 peer_id: &PeerId,
980 classification: C,
981 cmd: CMD,
982 version: u8,
983 body: &[&[u8]],
984 ) -> CmdResult<()> {
985 let mut send = self
986 .get_peer_classified_send(peer_id.clone(), classification)
987 .await?;
988 send.send2(cmd, version, body).await
989 }
990
991 async fn send2_by_peer_classified_tunnel_with_resp(
992 &self,
993 peer_id: &PeerId,
994 classification: C,
995 cmd: CMD,
996 version: u8,
997 body: &[&[u8]],
998 timeout: Duration,
999 ) -> CmdResult<CmdBody> {
1000 let mut send = self
1001 .get_peer_classified_send(peer_id.clone(), classification)
1002 .await?;
1003 send.send2_with_resp(cmd, version, body, timeout).await
1004 }
1005
1006 async fn send_cmd_by_peer_classified_tunnel(
1007 &self,
1008 peer_id: &PeerId,
1009 classification: C,
1010 cmd: CMD,
1011 version: u8,
1012 body: CmdBody,
1013 ) -> CmdResult<()> {
1014 let mut send = self
1015 .get_peer_classified_send(peer_id.clone(), classification)
1016 .await?;
1017 send.send_cmd(cmd, version, body).await
1018 }
1019
1020 async fn send_cmd_by_peer_classified_tunnel_with_resp(
1021 &self,
1022 peer_id: &PeerId,
1023 classification: C,
1024 cmd: CMD,
1025 version: u8,
1026 body: CmdBody,
1027 timeout: Duration,
1028 ) -> CmdResult<CmdBody> {
1029 let mut send = self
1030 .get_peer_classified_send(peer_id.clone(), classification)
1031 .await?;
1032 send.send_cmd_with_resp(cmd, version, body, timeout).await
1033 }
1034
1035 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId> {
1036 let send = self.get_classified_send(classification).await?;
1037 Ok(send.get_tunnel_id())
1038 }
1039
1040 async fn find_tunnel_id_by_peer_classified(
1041 &self,
1042 peer_id: &PeerId,
1043 classification: C,
1044 ) -> CmdResult<TunnelId> {
1045 let send = self
1046 .get_peer_classified_send(peer_id.clone(), classification)
1047 .await?;
1048 Ok(send.get_tunnel_id())
1049 }
1050
1051 async fn get_send_by_classified(
1052 &self,
1053 classification: C,
1054 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1055 Ok(ClassifiedSendGuard {
1056 worker_guard: self.get_classified_send(classification).await?,
1057 _p: Default::default(),
1058 })
1059 }
1060
1061 async fn get_send_by_peer_classified(
1062 &self,
1063 peer_id: &PeerId,
1064 classification: C,
1065 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1066 Ok(ClassifiedSendGuard {
1067 worker_guard: self
1068 .get_peer_classified_send(peer_id.clone(), classification)
1069 .await?,
1070 _p: Default::default(),
1071 })
1072 }
1073}