1use crate::client::{
2 ClassifiedCmdSend, ClassifiedCmdTunnelRead, ClassifiedCmdTunnelWrite, ClassifiedSendGuard,
3 RespWaiter, RespWaiterRef, gen_resp_id,
4};
5use crate::cmd::CmdHandlerMap;
6use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
7use crate::node::create_recv_handle;
8use crate::server::CmdTunnelListener;
9use crate::{
10 ClassifiedCmdNode, CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, PeerId, TunnelId,
11 TunnelIdGenerator, into_pool_err, pool_err,
12};
13use async_named_locker::ObjectHolder;
14use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
15use num::{FromPrimitive, ToPrimitive};
16use sfo_pool::{
17 ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
18 ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification,
19};
20use sfo_split::Splittable;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27#[derive(Debug, Clone, Eq, Hash)]
28pub struct CmdNodeTunnelClassification<C: WorkerClassification> {
29 pub peer_id: Option<PeerId>,
30 pub tunnel_id: Option<TunnelId>,
31 pub classification: Option<C>,
32}
33
34impl<C: WorkerClassification> PartialEq for CmdNodeTunnelClassification<C> {
35 fn eq(&self, other: &Self) -> bool {
36 self.peer_id == other.peer_id
37 && self.tunnel_id == other.tunnel_id
38 && self.classification == other.classification
39 }
40}
41
42impl<C, M, R, W, LEN, CMD> ClassifiedWorker<CmdNodeTunnelClassification<C>>
43 for ClassifiedCmdSend<C, M, R, W, LEN, CMD>
44where
45 C: WorkerClassification,
46 M: CmdTunnelMeta,
47 R: ClassifiedCmdTunnelRead<C, M>,
48 W: ClassifiedCmdTunnelWrite<C, M>,
49 LEN: RawEncode
50 + for<'a> RawDecode<'a>
51 + Copy
52 + Send
53 + Sync
54 + 'static
55 + FromPrimitive
56 + ToPrimitive,
57 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
58{
59 fn is_work(&self) -> bool {
60 self.is_work && !self.recv_handle.is_finished()
61 }
62
63 fn is_valid(&self, c: CmdNodeTunnelClassification<C>) -> bool {
64 if c.peer_id.is_some() && c.peer_id.as_ref().unwrap() != &self.remote_id {
65 return false;
66 }
67
68 if c.tunnel_id.is_some() {
69 self.tunnel_id == c.tunnel_id.unwrap()
70 } else {
71 if c.classification.is_some() {
72 self.classification == c.classification.unwrap()
73 } else {
74 true
75 }
76 }
77 }
78
79 fn classification(&self) -> CmdNodeTunnelClassification<C> {
80 CmdNodeTunnelClassification {
81 peer_id: Some(self.remote_id.clone()),
82 tunnel_id: Some(self.tunnel_id),
83 classification: Some(self.classification.clone()),
84 }
85 }
86}
87
88#[async_trait::async_trait]
89pub trait ClassifiedCmdNodeTunnelFactory<
90 C: WorkerClassification,
91 M: CmdTunnelMeta,
92 R: ClassifiedCmdTunnelRead<C, M>,
93 W: ClassifiedCmdTunnelWrite<C, M>,
94>: Send + Sync + 'static
95{
96 async fn create_tunnel(
97 &self,
98 classification: Option<CmdNodeTunnelClassification<C>>,
99 ) -> CmdResult<Splittable<R, W>>;
100}
101
102struct CmdWriteFactoryImpl<
103 C: WorkerClassification,
104 M: CmdTunnelMeta,
105 R: ClassifiedCmdTunnelRead<C, M>,
106 W: ClassifiedCmdTunnelWrite<C, M>,
107 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
108 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
109 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
110 LISTENER: CmdTunnelListener<M, R, W>,
111> {
112 tunnel_listener: LISTENER,
113 tunnel_factory: F,
114 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
115 tunnel_id_generator: TunnelIdGenerator,
116 resp_waiter: RespWaiterRef,
117 send_cache: Arc<Mutex<HashMap<PeerId, Vec<ClassifiedCmdSend<C, M, R, W, LEN, CMD>>>>>,
118}
119
120impl<
121 C: WorkerClassification,
122 M: CmdTunnelMeta,
123 R: ClassifiedCmdTunnelRead<C, M>,
124 W: ClassifiedCmdTunnelWrite<C, M>,
125 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
126 LEN: RawEncode
127 + for<'a> RawDecode<'a>
128 + Copy
129 + Send
130 + Sync
131 + 'static
132 + FromPrimitive
133 + ToPrimitive
134 + RawFixedBytes,
135 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
136 LISTENER: CmdTunnelListener<M, R, W>,
137> CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
138{
139 pub fn new(
140 tunnel_factory: F,
141 tunnel_listener: LISTENER,
142 cmd_handler: impl CmdHandler<LEN, CMD>,
143 resp_waiter: RespWaiterRef,
144 ) -> Self {
145 Self {
146 tunnel_listener,
147 tunnel_factory,
148 cmd_handler: Arc::new(cmd_handler),
149 tunnel_id_generator: TunnelIdGenerator::new(),
150 resp_waiter,
151 send_cache: Arc::new(Mutex::new(Default::default())),
152 }
153 }
154
155 pub fn start(self: &Arc<Self>) {
156 let this = self.clone();
157 tokio::spawn(async move {
158 if let Err(e) = this.run().await {
159 log::error!("cmd server error: {:?}", e);
160 }
161 });
162 }
163
164 async fn run(self: &Arc<Self>) -> CmdResult<()> {
165 loop {
166 let tunnel = self.tunnel_listener.accept().await?;
167 let peer_id = tunnel.get_remote_peer_id();
168 let classification = tunnel.get_classification();
169 let tunnel_id = self.tunnel_id_generator.generate();
170 let this = self.clone();
171 let resp_waiter = self.resp_waiter.clone();
172 tokio::spawn(async move {
173 let ret: CmdResult<()> = async move {
174 let this = this.clone();
175 let cmd_handler = this.cmd_handler.clone();
176 let (reader, writer) = tunnel.split();
177 let tunnel_meta = reader.get_tunnel_meta();
178 let remote_id = writer.get_remote_peer_id();
179 let writer = ObjectHolder::new(writer);
180 let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
181 reader,
182 writer.clone(),
183 tunnel_id,
184 cmd_handler,
185 );
186 {
187 let mut send_cache = this.send_cache.lock().unwrap();
188 let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
189 send_list.push(ClassifiedCmdSend::new(
190 tunnel_id,
191 classification,
192 recv_handle,
193 writer,
194 resp_waiter,
195 remote_id,
196 tunnel_meta,
197 ));
198 }
199 Ok(())
200 }
201 .await;
202 if let Err(e) = ret {
203 log::error!("peer connection error: {:?}", e);
204 }
205 });
206 }
207 }
208}
209
210#[async_trait::async_trait]
211impl<
212 C: WorkerClassification,
213 M: CmdTunnelMeta,
214 R: ClassifiedCmdTunnelRead<C, M>,
215 W: ClassifiedCmdTunnelWrite<C, M>,
216 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
217 LEN: RawEncode
218 + for<'a> RawDecode<'a>
219 + Copy
220 + Send
221 + Sync
222 + 'static
223 + FromPrimitive
224 + ToPrimitive
225 + RawFixedBytes,
226 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
227 LISTENER: CmdTunnelListener<M, R, W>,
228> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
229 for CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
230{
231 async fn create(
232 &self,
233 c: Option<CmdNodeTunnelClassification<C>>,
234 ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
235 if c.is_some() {
236 let classification = c.unwrap();
237 if classification.peer_id.is_some() {
238 let peer_id = classification.peer_id.clone().unwrap();
239 let tunnel_id = classification.tunnel_id;
240 if tunnel_id.is_some() {
241 let mut send_cache = self.send_cache.lock().unwrap();
242 if let Some(send_list) = send_cache.get_mut(&peer_id) {
243 let mut send_index = None;
244 for (index, send) in send_list.iter().enumerate() {
245 if send.get_tunnel_id() == tunnel_id.unwrap() {
246 send_index = Some(index);
247 break;
248 }
249 }
250 if let Some(send_index) = send_index {
251 let send = send_list.remove(send_index);
252 Ok(send)
253 } else {
254 Err(pool_err!(
255 PoolErrorCode::Failed,
256 "tunnel {:?} not found",
257 tunnel_id.unwrap()
258 ))
259 }
260 } else {
261 Err(pool_err!(
262 PoolErrorCode::Failed,
263 "tunnel {:?} not found",
264 tunnel_id.unwrap()
265 ))
266 }
267 } else {
268 {
269 let mut send_cache = self.send_cache.lock().unwrap();
270 if let Some(send_list) = send_cache.get_mut(&peer_id) {
271 if !send_list.is_empty() {
272 let send = send_list.pop().unwrap();
273 if send_list.is_empty() {
274 send_cache.remove(&peer_id);
275 }
276 return Ok(send);
277 }
278 }
279 }
280 let tunnel = self
281 .tunnel_factory
282 .create_tunnel(Some(classification))
283 .await
284 .map_err(into_pool_err!(PoolErrorCode::Failed))?;
285 let classification = tunnel.get_classification();
286 let tunnel_id = self.tunnel_id_generator.generate();
287 let (recv, write) = tunnel.split();
288 let remote_id = write.get_remote_peer_id();
289 let tunnel_meta = recv.get_tunnel_meta();
290 let write = ObjectHolder::new(write);
291 let cmd_handler = self.cmd_handler.clone();
292 let handle = create_recv_handle::<M, R, W, LEN, CMD>(
293 recv,
294 write.clone(),
295 tunnel_id,
296 cmd_handler,
297 );
298 Ok(ClassifiedCmdSend::new(
299 tunnel_id,
300 classification,
301 handle,
302 write,
303 self.resp_waiter.clone(),
304 remote_id,
305 tunnel_meta,
306 ))
307 }
308 } else {
309 if classification.tunnel_id.is_some() {
310 Err(pool_err!(
311 PoolErrorCode::Failed,
312 "must set peer id when set tunnel id"
313 ))
314 } else {
315 let tunnel = self
316 .tunnel_factory
317 .create_tunnel(Some(classification))
318 .await
319 .map_err(into_pool_err!(PoolErrorCode::Failed))?;
320 let classification = tunnel.get_classification();
321 let tunnel_id = self.tunnel_id_generator.generate();
322 let (recv, write) = tunnel.split();
323 let remote_id = write.get_remote_peer_id();
324 let tunnel_meta = write.get_tunnel_meta();
325 let write = ObjectHolder::new(write);
326 let cmd_handler = self.cmd_handler.clone();
327 let handle = create_recv_handle::<M, R, W, LEN, CMD>(
328 recv,
329 write.clone(),
330 tunnel_id,
331 cmd_handler,
332 );
333 Ok(ClassifiedCmdSend::new(
334 tunnel_id,
335 classification,
336 handle,
337 write,
338 self.resp_waiter.clone(),
339 remote_id,
340 tunnel_meta,
341 ))
342 }
343 }
344 } else {
345 Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
346 }
347 }
348}
349
350pub struct ClassifiedCmdNodeWriteFactory<
351 C: WorkerClassification,
352 M: CmdTunnelMeta,
353 R: ClassifiedCmdTunnelRead<C, M>,
354 W: ClassifiedCmdTunnelWrite<C, M>,
355 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
356 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
357 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
358 LISTENER: CmdTunnelListener<M, R, W>,
359> {
360 inner: Arc<CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>>,
361}
362
363impl<
364 C: WorkerClassification,
365 M: CmdTunnelMeta,
366 R: ClassifiedCmdTunnelRead<C, M>,
367 W: ClassifiedCmdTunnelWrite<C, M>,
368 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
369 LEN: RawEncode
370 + for<'a> RawDecode<'a>
371 + Copy
372 + Send
373 + Sync
374 + 'static
375 + FromPrimitive
376 + ToPrimitive
377 + RawFixedBytes
378 + RawFixedBytes,
379 CMD: RawEncode
380 + for<'a> RawDecode<'a>
381 + Copy
382 + Send
383 + Sync
384 + 'static
385 + Debug
386 + RawFixedBytes
387 + RawFixedBytes,
388 LISTENER: CmdTunnelListener<M, R, W>,
389> ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
390{
391 pub(crate) fn new(
392 tunnel_factory: F,
393 tunnel_listener: LISTENER,
394 cmd_handler: impl CmdHandler<LEN, CMD>,
395 resp_waiter: RespWaiterRef,
396 ) -> Self {
397 Self {
398 inner: Arc::new(CmdWriteFactoryImpl::new(
399 tunnel_factory,
400 tunnel_listener,
401 cmd_handler,
402 resp_waiter,
403 )),
404 }
405 }
406
407 pub fn start(&self) {
408 self.inner.start();
409 }
410}
411
412#[async_trait::async_trait]
413impl<
414 C: WorkerClassification,
415 M: CmdTunnelMeta,
416 R: ClassifiedCmdTunnelRead<C, M>,
417 W: ClassifiedCmdTunnelWrite<C, M>,
418 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
419 LEN: RawEncode
420 + for<'a> RawDecode<'a>
421 + Copy
422 + Send
423 + Sync
424 + 'static
425 + FromPrimitive
426 + ToPrimitive
427 + RawFixedBytes,
428 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
429 LISTENER: CmdTunnelListener<M, R, W>,
430> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
431 for ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
432{
433 async fn create(
434 &self,
435 c: Option<CmdNodeTunnelClassification<C>>,
436 ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
437 self.inner.create(c).await
438 }
439}
440
441pub struct DefaultClassifiedCmdNode<
442 C: WorkerClassification,
443 M: CmdTunnelMeta,
444 R: ClassifiedCmdTunnelRead<C, M>,
445 W: ClassifiedCmdTunnelWrite<C, M>,
446 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
447 LEN: RawEncode
448 + for<'a> RawDecode<'a>
449 + Copy
450 + Send
451 + Sync
452 + 'static
453 + FromPrimitive
454 + ToPrimitive
455 + RawFixedBytes,
456 CMD: RawEncode
457 + for<'a> RawDecode<'a>
458 + Copy
459 + Send
460 + Sync
461 + 'static
462 + RawFixedBytes
463 + Eq
464 + Hash
465 + Debug,
466 LISTENER: CmdTunnelListener<M, R, W>,
467> {
468 tunnel_pool: ClassifiedWorkerPoolRef<
469 CmdNodeTunnelClassification<C>,
470 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
471 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
472 >,
473 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
474}
475
476impl<
477 C: WorkerClassification,
478 M: CmdTunnelMeta,
479 R: ClassifiedCmdTunnelRead<C, M>,
480 W: ClassifiedCmdTunnelWrite<C, M>,
481 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
482 LEN: RawEncode
483 + for<'a> RawDecode<'a>
484 + Copy
485 + Send
486 + Sync
487 + 'static
488 + FromPrimitive
489 + ToPrimitive
490 + RawFixedBytes,
491 CMD: RawEncode
492 + for<'a> RawDecode<'a>
493 + Copy
494 + Send
495 + Sync
496 + 'static
497 + RawFixedBytes
498 + Eq
499 + Hash
500 + Debug,
501 LISTENER: CmdTunnelListener<M, R, W>,
502> DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
503{
504 pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
505 let cmd_handler_map = Arc::new(CmdHandlerMap::new());
506 let handler_map = cmd_handler_map.clone();
507 let resp_waiter = Arc::new(RespWaiter::new());
508 let waiter = resp_waiter.clone();
509 let write_factory = ClassifiedCmdNodeWriteFactory::<C, M, R, W, _, LEN, CMD, LISTENER>::new(
510 factory,
511 listener,
512 move |peer_id: PeerId,
513 tunnel_id: TunnelId,
514 header: CmdHeader<LEN, CMD>,
515 body_read: CmdBody| {
516 let handler_map = handler_map.clone();
517 let waiter = waiter.clone();
518 async move {
519 if header.is_resp() && header.seq().is_some() {
520 let resp_id =
521 gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
522 let _ = waiter.set_result(resp_id, body_read);
523 Ok(None)
524 } else {
525 if let Some(handler) = handler_map.get(header.cmd_code()) {
526 handler.handle(peer_id, tunnel_id, header, body_read).await
527 } else {
528 Ok(None)
529 }
530 }
531 }
532 },
533 resp_waiter.clone(),
534 );
535 write_factory.start();
536 Arc::new(Self {
537 tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
538 cmd_handler_map,
539 })
540 }
541
542 async fn get_send(
543 &self,
544 peer_id: PeerId,
545 ) -> CmdResult<
546 ClassifiedWorkerGuard<
547 CmdNodeTunnelClassification<C>,
548 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
549 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
550 >,
551 > {
552 self.tunnel_pool
553 .get_classified_worker(CmdNodeTunnelClassification {
554 peer_id: Some(peer_id),
555 tunnel_id: None,
556 classification: None,
557 })
558 .await
559 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
560 }
561
562 async fn get_send_of_tunnel_id(
563 &self,
564 peer_id: PeerId,
565 tunnel_id: TunnelId,
566 ) -> CmdResult<
567 ClassifiedWorkerGuard<
568 CmdNodeTunnelClassification<C>,
569 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
570 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
571 >,
572 > {
573 self.tunnel_pool
574 .get_classified_worker(CmdNodeTunnelClassification {
575 peer_id: Some(peer_id),
576 tunnel_id: Some(tunnel_id),
577 classification: None,
578 })
579 .await
580 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
581 }
582
583 async fn get_classified_send(
584 &self,
585 classification: C,
586 ) -> CmdResult<
587 ClassifiedWorkerGuard<
588 CmdNodeTunnelClassification<C>,
589 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
590 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
591 >,
592 > {
593 self.tunnel_pool
594 .get_classified_worker(CmdNodeTunnelClassification {
595 peer_id: None,
596 tunnel_id: None,
597 classification: Some(classification),
598 })
599 .await
600 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
601 }
602
603 async fn get_peer_classified_send(
604 &self,
605 peer_id: PeerId,
606 classification: C,
607 ) -> CmdResult<
608 ClassifiedWorkerGuard<
609 CmdNodeTunnelClassification<C>,
610 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
611 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
612 >,
613 > {
614 self.tunnel_pool
615 .get_classified_worker(CmdNodeTunnelClassification {
616 peer_id: Some(peer_id),
617 tunnel_id: None,
618 classification: Some(classification),
619 })
620 .await
621 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
622 }
623}
624
625pub type ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
626 CmdNodeTunnelClassification<C>,
627 M,
628 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
629 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
630>;
631#[async_trait::async_trait]
632impl<
633 C: WorkerClassification,
634 M: CmdTunnelMeta,
635 R: ClassifiedCmdTunnelRead<C, M>,
636 W: ClassifiedCmdTunnelWrite<C, M>,
637 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
638 LEN: RawEncode
639 + for<'a> RawDecode<'a>
640 + Copy
641 + Send
642 + Sync
643 + 'static
644 + FromPrimitive
645 + ToPrimitive
646 + RawFixedBytes,
647 CMD: RawEncode
648 + for<'a> RawDecode<'a>
649 + Copy
650 + Send
651 + Sync
652 + 'static
653 + RawFixedBytes
654 + Eq
655 + Hash
656 + Debug,
657 LISTENER: CmdTunnelListener<M, R, W>,
658>
659 CmdNode<
660 LEN,
661 CMD,
662 M,
663 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
664 ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
665 > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
666{
667 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
668 self.cmd_handler_map.insert(cmd, handler)
669 }
670
671 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
672 let mut send = self.get_send(peer_id.clone()).await?;
673 send.send(cmd, version, body).await
674 }
675
676 async fn send_with_resp(
677 &self,
678 peer_id: &PeerId,
679 cmd: CMD,
680 version: u8,
681 body: &[u8],
682 timeout: Duration,
683 ) -> CmdResult<CmdBody> {
684 let mut send = self.get_send(peer_id.clone()).await?;
685 send.send_with_resp(cmd, version, body, timeout).await
686 }
687
688 async fn send2(
689 &self,
690 peer_id: &PeerId,
691 cmd: CMD,
692 version: u8,
693 body: &[&[u8]],
694 ) -> CmdResult<()> {
695 let mut send = self.get_send(peer_id.clone()).await?;
696 send.send2(cmd, version, body).await
697 }
698
699 async fn send2_with_resp(
700 &self,
701 peer_id: &PeerId,
702 cmd: CMD,
703 version: u8,
704 body: &[&[u8]],
705 timeout: Duration,
706 ) -> CmdResult<CmdBody> {
707 let mut send = self.get_send(peer_id.clone()).await?;
708 send.send2_with_resp(cmd, version, body, timeout).await
709 }
710
711 async fn send_cmd(
712 &self,
713 peer_id: &PeerId,
714 cmd: CMD,
715 version: u8,
716 body: CmdBody,
717 ) -> CmdResult<()> {
718 let mut send = self.get_send(peer_id.clone()).await?;
719 send.send_cmd(cmd, version, body).await
720 }
721
722 async fn send_cmd_with_resp(
723 &self,
724 peer_id: &PeerId,
725 cmd: CMD,
726 version: u8,
727 body: CmdBody,
728 timeout: Duration,
729 ) -> CmdResult<CmdBody> {
730 let mut send = self.get_send(peer_id.clone()).await?;
731 send.send_cmd_with_resp(cmd, version, body, timeout).await
732 }
733
734 async fn send_by_specify_tunnel(
735 &self,
736 peer_id: &PeerId,
737 tunnel_id: TunnelId,
738 cmd: CMD,
739 version: u8,
740 body: &[u8],
741 ) -> CmdResult<()> {
742 let mut send = self
743 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
744 .await?;
745 send.send(cmd, version, body).await
746 }
747
748 async fn send_by_specify_tunnel_with_resp(
749 &self,
750 peer_id: &PeerId,
751 tunnel_id: TunnelId,
752 cmd: CMD,
753 version: u8,
754 body: &[u8],
755 timeout: Duration,
756 ) -> CmdResult<CmdBody> {
757 let mut send = self
758 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
759 .await?;
760 send.send_with_resp(cmd, version, body, timeout).await
761 }
762
763 async fn send2_by_specify_tunnel(
764 &self,
765 peer_id: &PeerId,
766 tunnel_id: TunnelId,
767 cmd: CMD,
768 version: u8,
769 body: &[&[u8]],
770 ) -> CmdResult<()> {
771 let mut send = self
772 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
773 .await?;
774 send.send2(cmd, version, body).await
775 }
776
777 async fn send2_by_specify_tunnel_with_resp(
778 &self,
779 peer_id: &PeerId,
780 tunnel_id: TunnelId,
781 cmd: CMD,
782 version: u8,
783 body: &[&[u8]],
784 timeout: Duration,
785 ) -> CmdResult<CmdBody> {
786 let mut send = self
787 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
788 .await?;
789 send.send2_with_resp(cmd, version, body, timeout).await
790 }
791
792 async fn send_cmd_by_specify_tunnel(
793 &self,
794 peer_id: &PeerId,
795 tunnel_id: TunnelId,
796 cmd: CMD,
797 version: u8,
798 body: CmdBody,
799 ) -> CmdResult<()> {
800 let mut send = self
801 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
802 .await?;
803 send.send_cmd(cmd, version, body).await
804 }
805
806 async fn send_cmd_by_specify_tunnel_with_resp(
807 &self,
808 peer_id: &PeerId,
809 tunnel_id: TunnelId,
810 cmd: CMD,
811 version: u8,
812 body: CmdBody,
813 timeout: Duration,
814 ) -> CmdResult<CmdBody> {
815 let mut send = self
816 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
817 .await?;
818 send.send_cmd_with_resp(cmd, version, body, timeout).await
819 }
820
821 async fn clear_all_tunnel(&self) {
822 self.tunnel_pool.clear_all_worker().await;
823 }
824
825 async fn get_send(
826 &self,
827 peer_id: &PeerId,
828 tunnel_id: TunnelId,
829 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
830 Ok(ClassifiedSendGuard {
831 worker_guard: self
832 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
833 .await?,
834 _p: Default::default(),
835 })
836 }
837}
838
839#[async_trait::async_trait]
840impl<
841 C: WorkerClassification,
842 M: CmdTunnelMeta,
843 R: ClassifiedCmdTunnelRead<C, M>,
844 W: ClassifiedCmdTunnelWrite<C, M>,
845 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
846 LEN: RawEncode
847 + for<'a> RawDecode<'a>
848 + Copy
849 + Send
850 + Sync
851 + 'static
852 + FromPrimitive
853 + ToPrimitive
854 + RawFixedBytes,
855 CMD: RawEncode
856 + for<'a> RawDecode<'a>
857 + Copy
858 + Send
859 + Sync
860 + 'static
861 + RawFixedBytes
862 + Eq
863 + Hash
864 + Debug,
865 LISTENER: CmdTunnelListener<M, R, W>,
866>
867 ClassifiedCmdNode<
868 LEN,
869 CMD,
870 C,
871 M,
872 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
873 ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
874 > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
875{
876 async fn send_by_classified_tunnel(
877 &self,
878 classification: C,
879 cmd: CMD,
880 version: u8,
881 body: &[u8],
882 ) -> CmdResult<()> {
883 let mut send = self.get_classified_send(classification).await?;
884 send.send(cmd, version, body).await
885 }
886
887 async fn send_by_classified_tunnel_with_resp(
888 &self,
889 classification: C,
890 cmd: CMD,
891 version: u8,
892 body: &[u8],
893 timeout: Duration,
894 ) -> CmdResult<CmdBody> {
895 let mut send = self.get_classified_send(classification).await?;
896 send.send_with_resp(cmd, version, body, timeout).await
897 }
898
899 async fn send2_by_classified_tunnel(
900 &self,
901 classification: C,
902 cmd: CMD,
903 version: u8,
904 body: &[&[u8]],
905 ) -> CmdResult<()> {
906 let mut send = self.get_classified_send(classification).await?;
907 send.send2(cmd, version, body).await
908 }
909
910 async fn send2_by_classified_tunnel_with_resp(
911 &self,
912 classification: C,
913 cmd: CMD,
914 version: u8,
915 body: &[&[u8]],
916 timeout: Duration,
917 ) -> CmdResult<CmdBody> {
918 let mut send = self.get_classified_send(classification).await?;
919 send.send2_with_resp(cmd, version, body, timeout).await
920 }
921
922 async fn send_cmd_by_classified_tunnel(
923 &self,
924 classification: C,
925 cmd: CMD,
926 version: u8,
927 body: CmdBody,
928 ) -> CmdResult<()> {
929 let mut send = self.get_classified_send(classification).await?;
930 send.send_cmd(cmd, version, body).await
931 }
932
933 async fn send_cmd_by_classified_tunnel_with_resp(
934 &self,
935 classification: C,
936 cmd: CMD,
937 version: u8,
938 body: CmdBody,
939 timeout: Duration,
940 ) -> CmdResult<CmdBody> {
941 let mut send = self.get_classified_send(classification).await?;
942 send.send_cmd_with_resp(cmd, version, body, timeout).await
943 }
944
945 async fn send_by_peer_classified_tunnel(
946 &self,
947 peer_id: &PeerId,
948 classification: C,
949 cmd: CMD,
950 version: u8,
951 body: &[u8],
952 ) -> CmdResult<()> {
953 let mut send = self
954 .get_peer_classified_send(peer_id.clone(), classification)
955 .await?;
956 send.send(cmd, version, body).await
957 }
958
959 async fn send_by_peer_classified_tunnel_with_resp(
960 &self,
961 peer_id: &PeerId,
962 classification: C,
963 cmd: CMD,
964 version: u8,
965 body: &[u8],
966 timeout: Duration,
967 ) -> CmdResult<CmdBody> {
968 let mut send = self
969 .get_peer_classified_send(peer_id.clone(), classification)
970 .await?;
971 send.send_with_resp(cmd, version, body, timeout).await
972 }
973
974 async fn send2_by_peer_classified_tunnel(
975 &self,
976 peer_id: &PeerId,
977 classification: C,
978 cmd: CMD,
979 version: u8,
980 body: &[&[u8]],
981 ) -> CmdResult<()> {
982 let mut send = self
983 .get_peer_classified_send(peer_id.clone(), classification)
984 .await?;
985 send.send2(cmd, version, body).await
986 }
987
988 async fn send2_by_peer_classified_tunnel_with_resp(
989 &self,
990 peer_id: &PeerId,
991 classification: C,
992 cmd: CMD,
993 version: u8,
994 body: &[&[u8]],
995 timeout: Duration,
996 ) -> CmdResult<CmdBody> {
997 let mut send = self
998 .get_peer_classified_send(peer_id.clone(), classification)
999 .await?;
1000 send.send2_with_resp(cmd, version, body, timeout).await
1001 }
1002
1003 async fn send_cmd_by_peer_classified_tunnel(
1004 &self,
1005 peer_id: &PeerId,
1006 classification: C,
1007 cmd: CMD,
1008 version: u8,
1009 body: CmdBody,
1010 ) -> CmdResult<()> {
1011 let mut send = self
1012 .get_peer_classified_send(peer_id.clone(), classification)
1013 .await?;
1014 send.send_cmd(cmd, version, body).await
1015 }
1016
1017 async fn send_cmd_by_peer_classified_tunnel_with_resp(
1018 &self,
1019 peer_id: &PeerId,
1020 classification: C,
1021 cmd: CMD,
1022 version: u8,
1023 body: CmdBody,
1024 timeout: Duration,
1025 ) -> CmdResult<CmdBody> {
1026 let mut send = self
1027 .get_peer_classified_send(peer_id.clone(), classification)
1028 .await?;
1029 send.send_cmd_with_resp(cmd, version, body, timeout).await
1030 }
1031
1032 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId> {
1033 let send = self.get_classified_send(classification).await?;
1034 Ok(send.get_tunnel_id())
1035 }
1036
1037 async fn find_tunnel_id_by_peer_classified(
1038 &self,
1039 peer_id: &PeerId,
1040 classification: C,
1041 ) -> CmdResult<TunnelId> {
1042 let send = self
1043 .get_peer_classified_send(peer_id.clone(), classification)
1044 .await?;
1045 Ok(send.get_tunnel_id())
1046 }
1047
1048 async fn get_send_by_classified(
1049 &self,
1050 classification: C,
1051 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1052 Ok(ClassifiedSendGuard {
1053 worker_guard: self.get_classified_send(classification).await?,
1054 _p: Default::default(),
1055 })
1056 }
1057
1058 async fn get_send_by_peer_classified(
1059 &self,
1060 peer_id: &PeerId,
1061 classification: C,
1062 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1063 Ok(ClassifiedSendGuard {
1064 worker_guard: self
1065 .get_peer_classified_send(peer_id.clone(), classification)
1066 .await?,
1067 _p: Default::default(),
1068 })
1069 }
1070}