1use crate::client::{
2 ClassifiedCmdSend, ClassifiedCmdTunnelRead, ClassifiedCmdTunnelWrite, ClassifiedSendGuard,
3 RespWaiter, RespWaiterRef, gen_resp_id,
4};
5use crate::cmd::CmdHandlerMap;
6use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
7use crate::node::create_recv_handle;
8use crate::server::CmdTunnelListener;
9use crate::{
10 ClassifiedCmdNode, CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, PeerId, TunnelId,
11 TunnelIdGenerator, into_pool_err, pool_err,
12};
13use async_named_locker::ObjectHolder;
14use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
15use num::{FromPrimitive, ToPrimitive};
16use sfo_pool::{
17 ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
18 ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification,
19};
20use sfo_split::Splittable;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27#[derive(Debug, Clone, Eq, Hash)]
28pub struct CmdNodeTunnelClassification<C: WorkerClassification> {
29 pub peer_id: Option<PeerId>,
30 pub tunnel_id: Option<TunnelId>,
31 pub classification: Option<C>,
32}
33
34impl<C: WorkerClassification> PartialEq for CmdNodeTunnelClassification<C> {
35 fn eq(&self, other: &Self) -> bool {
36 self.peer_id == other.peer_id
37 && self.tunnel_id == other.tunnel_id
38 && self.classification == other.classification
39 }
40}
41
42impl<C, M, R, W, LEN, CMD> ClassifiedWorker<CmdNodeTunnelClassification<C>>
43 for ClassifiedCmdSend<C, M, R, W, LEN, CMD>
44where
45 C: WorkerClassification,
46 M: CmdTunnelMeta,
47 R: ClassifiedCmdTunnelRead<C, M>,
48 W: ClassifiedCmdTunnelWrite<C, M>,
49 LEN: RawEncode
50 + for<'a> RawDecode<'a>
51 + Copy
52 + Send
53 + Sync
54 + 'static
55 + FromPrimitive
56 + ToPrimitive,
57 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
58{
59 fn is_work(&self) -> bool {
60 self.is_work && !self.recv_handle.is_finished()
61 }
62
63 fn is_valid(&self, c: CmdNodeTunnelClassification<C>) -> bool {
64 if c.peer_id.is_some() && c.peer_id.as_ref().unwrap() != &self.remote_id {
65 return false;
66 }
67
68 if c.tunnel_id.is_some() {
69 self.tunnel_id == c.tunnel_id.unwrap()
70 } else {
71 if c.classification.is_some() {
72 self.classification == c.classification.unwrap()
73 } else {
74 true
75 }
76 }
77 }
78
79 fn classification(&self) -> CmdNodeTunnelClassification<C> {
80 CmdNodeTunnelClassification {
81 peer_id: self.pool_peer_id.clone(),
82 tunnel_id: self.pool_tunnel_id,
83 classification: self.pool_classification.clone(),
84 }
85 }
86}
87
88#[async_trait::async_trait]
89pub trait ClassifiedCmdNodeTunnelFactory<
90 C: WorkerClassification,
91 M: CmdTunnelMeta,
92 R: ClassifiedCmdTunnelRead<C, M>,
93 W: ClassifiedCmdTunnelWrite<C, M>,
94>: Send + Sync + 'static
95{
96 async fn create_tunnel(
97 &self,
98 classification: Option<CmdNodeTunnelClassification<C>>,
99 ) -> CmdResult<Splittable<R, W>>;
100}
101
102struct CmdWriteFactoryImpl<
103 C: WorkerClassification,
104 M: CmdTunnelMeta,
105 R: ClassifiedCmdTunnelRead<C, M>,
106 W: ClassifiedCmdTunnelWrite<C, M>,
107 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
108 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
109 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
110 LISTENER: CmdTunnelListener<M, R, W>,
111> {
112 tunnel_listener: LISTENER,
113 tunnel_factory: F,
114 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
115 tunnel_id_generator: TunnelIdGenerator,
116 resp_waiter: RespWaiterRef,
117 send_cache: Arc<Mutex<HashMap<PeerId, Vec<ClassifiedCmdSend<C, M, R, W, LEN, CMD>>>>>,
118}
119
120impl<
121 C: WorkerClassification,
122 M: CmdTunnelMeta,
123 R: ClassifiedCmdTunnelRead<C, M>,
124 W: ClassifiedCmdTunnelWrite<C, M>,
125 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
126 LEN: RawEncode
127 + for<'a> RawDecode<'a>
128 + Copy
129 + Send
130 + Sync
131 + 'static
132 + FromPrimitive
133 + ToPrimitive
134 + RawFixedBytes,
135 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
136 LISTENER: CmdTunnelListener<M, R, W>,
137> CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
138{
139 pub fn new(
140 tunnel_factory: F,
141 tunnel_listener: LISTENER,
142 cmd_handler: impl CmdHandler<LEN, CMD>,
143 resp_waiter: RespWaiterRef,
144 ) -> Self {
145 Self {
146 tunnel_listener,
147 tunnel_factory,
148 cmd_handler: Arc::new(cmd_handler),
149 tunnel_id_generator: TunnelIdGenerator::new(),
150 resp_waiter,
151 send_cache: Arc::new(Mutex::new(Default::default())),
152 }
153 }
154
155 pub fn start(self: &Arc<Self>) {
156 let this = self.clone();
157 tokio::spawn(async move {
158 if let Err(e) = this.run().await {
159 log::error!("cmd server error: {:?}", e);
160 }
161 });
162 }
163
164 async fn run(self: &Arc<Self>) -> CmdResult<()> {
165 loop {
166 let tunnel = self.tunnel_listener.accept().await?;
167 let peer_id = tunnel.get_remote_peer_id();
168 let classification = tunnel.get_classification();
169 let tunnel_id = self.tunnel_id_generator.generate();
170 let this = self.clone();
171 let resp_waiter = self.resp_waiter.clone();
172 tokio::spawn(async move {
173 let ret: CmdResult<()> = async move {
174 let this = this.clone();
175 let cmd_handler = this.cmd_handler.clone();
176 let (reader, writer) = tunnel.split();
177 let tunnel_meta = reader.get_tunnel_meta();
178 let remote_id = writer.get_remote_peer_id();
179 let writer = ObjectHolder::new(writer);
180 let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
181 reader,
182 writer.clone(),
183 tunnel_id,
184 cmd_handler,
185 );
186 {
187 let mut send_cache = this.send_cache.lock().unwrap();
188 let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
189 send_list.push(ClassifiedCmdSend::new(
190 tunnel_id,
191 classification.clone(),
192 Some(remote_id.clone()),
193 Some(tunnel_id),
194 Some(classification),
195 recv_handle,
196 writer,
197 resp_waiter,
198 remote_id,
199 tunnel_meta,
200 ));
201 }
202 Ok(())
203 }
204 .await;
205 if let Err(e) = ret {
206 log::error!("peer connection error: {:?}", e);
207 }
208 });
209 }
210 }
211}
212
213#[async_trait::async_trait]
214impl<
215 C: WorkerClassification,
216 M: CmdTunnelMeta,
217 R: ClassifiedCmdTunnelRead<C, M>,
218 W: ClassifiedCmdTunnelWrite<C, M>,
219 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
220 LEN: RawEncode
221 + for<'a> RawDecode<'a>
222 + Copy
223 + Send
224 + Sync
225 + 'static
226 + FromPrimitive
227 + ToPrimitive
228 + RawFixedBytes,
229 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
230 LISTENER: CmdTunnelListener<M, R, W>,
231> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
232 for CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>
233{
234 async fn create(
235 &self,
236 c: Option<CmdNodeTunnelClassification<C>>,
237 ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
238 if c.is_some() {
239 let classification = c.unwrap();
240 if classification.peer_id.is_some() {
241 let peer_id = classification.peer_id.clone().unwrap();
242 let tunnel_id = classification.tunnel_id;
243 if tunnel_id.is_some() {
244 let mut send_cache = self.send_cache.lock().unwrap();
245 if let Some(send_list) = send_cache.get_mut(&peer_id) {
246 let mut send_index = None;
247 for (index, send) in send_list.iter().enumerate() {
248 if send.get_tunnel_id() == tunnel_id.unwrap() {
249 send_index = Some(index);
250 break;
251 }
252 }
253 if let Some(send_index) = send_index {
254 let mut send = send_list.remove(send_index);
255 send.set_pool_key(
256 Some(peer_id.clone()),
257 tunnel_id,
258 classification.classification.clone(),
259 );
260 Ok(send)
261 } else {
262 Err(pool_err!(
263 PoolErrorCode::Failed,
264 "tunnel {:?} not found",
265 tunnel_id.unwrap()
266 ))
267 }
268 } else {
269 Err(pool_err!(
270 PoolErrorCode::Failed,
271 "tunnel {:?} not found",
272 tunnel_id.unwrap()
273 ))
274 }
275 } else {
276 {
277 let mut send_cache = self.send_cache.lock().unwrap();
278 if let Some(send_list) = send_cache.get_mut(&peer_id) {
279 if !send_list.is_empty() {
280 let mut send = send_list.pop().unwrap();
281 send.set_pool_key(
282 Some(peer_id.clone()),
283 tunnel_id,
284 classification.classification.clone(),
285 );
286 if send_list.is_empty() {
287 send_cache.remove(&peer_id);
288 }
289 return Ok(send);
290 }
291 }
292 }
293 let tunnel = self
294 .tunnel_factory
295 .create_tunnel(Some(classification.clone()))
296 .await
297 .map_err(into_pool_err!(PoolErrorCode::Failed))?;
298 let actual_classification = tunnel.get_classification();
299 let pool_peer_id = classification.peer_id.clone();
300 let pool_tunnel_id = classification.tunnel_id;
301 let pool_classification = classification.classification.clone();
302 let tunnel_id = self.tunnel_id_generator.generate();
303 let (recv, write) = tunnel.split();
304 let remote_id = write.get_remote_peer_id();
305 let tunnel_meta = recv.get_tunnel_meta();
306 let write = ObjectHolder::new(write);
307 let cmd_handler = self.cmd_handler.clone();
308 let handle = create_recv_handle::<M, R, W, LEN, CMD>(
309 recv,
310 write.clone(),
311 tunnel_id,
312 cmd_handler,
313 );
314 Ok(ClassifiedCmdSend::new(
315 tunnel_id,
316 actual_classification,
317 pool_peer_id,
318 pool_tunnel_id,
319 pool_classification,
320 handle,
321 write,
322 self.resp_waiter.clone(),
323 remote_id,
324 tunnel_meta,
325 ))
326 }
327 } else {
328 if classification.tunnel_id.is_some() {
329 Err(pool_err!(
330 PoolErrorCode::Failed,
331 "must set peer id when set tunnel id"
332 ))
333 } else {
334 let tunnel = self
335 .tunnel_factory
336 .create_tunnel(Some(classification.clone()))
337 .await
338 .map_err(into_pool_err!(PoolErrorCode::Failed))?;
339 let actual_classification = tunnel.get_classification();
340 let pool_peer_id = classification.peer_id.clone();
341 let pool_tunnel_id = classification.tunnel_id;
342 let pool_classification = classification.classification.clone();
343 let tunnel_id = self.tunnel_id_generator.generate();
344 let (recv, write) = tunnel.split();
345 let remote_id = write.get_remote_peer_id();
346 let tunnel_meta = write.get_tunnel_meta();
347 let write = ObjectHolder::new(write);
348 let cmd_handler = self.cmd_handler.clone();
349 let handle = create_recv_handle::<M, R, W, LEN, CMD>(
350 recv,
351 write.clone(),
352 tunnel_id,
353 cmd_handler,
354 );
355 Ok(ClassifiedCmdSend::new(
356 tunnel_id,
357 actual_classification,
358 pool_peer_id,
359 pool_tunnel_id,
360 pool_classification,
361 handle,
362 write,
363 self.resp_waiter.clone(),
364 remote_id,
365 tunnel_meta,
366 ))
367 }
368 }
369 } else {
370 Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
371 }
372 }
373}
374
375pub struct ClassifiedCmdNodeWriteFactory<
376 C: WorkerClassification,
377 M: CmdTunnelMeta,
378 R: ClassifiedCmdTunnelRead<C, M>,
379 W: ClassifiedCmdTunnelWrite<C, M>,
380 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
381 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
382 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
383 LISTENER: CmdTunnelListener<M, R, W>,
384> {
385 inner: Arc<CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>>,
386}
387
388impl<
389 C: WorkerClassification,
390 M: CmdTunnelMeta,
391 R: ClassifiedCmdTunnelRead<C, M>,
392 W: ClassifiedCmdTunnelWrite<C, M>,
393 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
394 LEN: RawEncode
395 + for<'a> RawDecode<'a>
396 + Copy
397 + Send
398 + Sync
399 + 'static
400 + FromPrimitive
401 + ToPrimitive
402 + RawFixedBytes
403 + RawFixedBytes,
404 CMD: RawEncode
405 + for<'a> RawDecode<'a>
406 + Copy
407 + Send
408 + Sync
409 + 'static
410 + Debug
411 + RawFixedBytes
412 + RawFixedBytes,
413 LISTENER: CmdTunnelListener<M, R, W>,
414> ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
415{
416 pub(crate) fn new(
417 tunnel_factory: F,
418 tunnel_listener: LISTENER,
419 cmd_handler: impl CmdHandler<LEN, CMD>,
420 resp_waiter: RespWaiterRef,
421 ) -> Self {
422 Self {
423 inner: Arc::new(CmdWriteFactoryImpl::new(
424 tunnel_factory,
425 tunnel_listener,
426 cmd_handler,
427 resp_waiter,
428 )),
429 }
430 }
431
432 pub fn start(&self) {
433 self.inner.start();
434 }
435}
436
437#[async_trait::async_trait]
438impl<
439 C: WorkerClassification,
440 M: CmdTunnelMeta,
441 R: ClassifiedCmdTunnelRead<C, M>,
442 W: ClassifiedCmdTunnelWrite<C, M>,
443 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
444 LEN: RawEncode
445 + for<'a> RawDecode<'a>
446 + Copy
447 + Send
448 + Sync
449 + 'static
450 + FromPrimitive
451 + ToPrimitive
452 + RawFixedBytes,
453 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
454 LISTENER: CmdTunnelListener<M, R, W>,
455> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>>
456 for ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>
457{
458 async fn create(
459 &self,
460 c: Option<CmdNodeTunnelClassification<C>>,
461 ) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
462 self.inner.create(c).await
463 }
464}
465
466pub struct DefaultClassifiedCmdNode<
467 C: WorkerClassification,
468 M: CmdTunnelMeta,
469 R: ClassifiedCmdTunnelRead<C, M>,
470 W: ClassifiedCmdTunnelWrite<C, M>,
471 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
472 LEN: RawEncode
473 + for<'a> RawDecode<'a>
474 + Copy
475 + Send
476 + Sync
477 + 'static
478 + FromPrimitive
479 + ToPrimitive
480 + RawFixedBytes,
481 CMD: RawEncode
482 + for<'a> RawDecode<'a>
483 + Copy
484 + Send
485 + Sync
486 + 'static
487 + RawFixedBytes
488 + Eq
489 + Hash
490 + Debug,
491 LISTENER: CmdTunnelListener<M, R, W>,
492> {
493 tunnel_pool: ClassifiedWorkerPoolRef<
494 CmdNodeTunnelClassification<C>,
495 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
496 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
497 >,
498 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
499}
500
501impl<
502 C: WorkerClassification,
503 M: CmdTunnelMeta,
504 R: ClassifiedCmdTunnelRead<C, M>,
505 W: ClassifiedCmdTunnelWrite<C, M>,
506 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
507 LEN: RawEncode
508 + for<'a> RawDecode<'a>
509 + Copy
510 + Send
511 + Sync
512 + 'static
513 + FromPrimitive
514 + ToPrimitive
515 + RawFixedBytes,
516 CMD: RawEncode
517 + for<'a> RawDecode<'a>
518 + Copy
519 + Send
520 + Sync
521 + 'static
522 + RawFixedBytes
523 + Eq
524 + Hash
525 + Debug,
526 LISTENER: CmdTunnelListener<M, R, W>,
527> DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
528{
529 pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
530 let cmd_handler_map = Arc::new(CmdHandlerMap::new());
531 let handler_map = cmd_handler_map.clone();
532 let resp_waiter = Arc::new(RespWaiter::new());
533 let waiter = resp_waiter.clone();
534 let write_factory = ClassifiedCmdNodeWriteFactory::<C, M, R, W, _, LEN, CMD, LISTENER>::new(
535 factory,
536 listener,
537 move |local_id: PeerId,
538 peer_id: PeerId,
539 tunnel_id: TunnelId,
540 header: CmdHeader<LEN, CMD>,
541 body_read: CmdBody| {
542 let handler_map = handler_map.clone();
543 let waiter = waiter.clone();
544 async move {
545 if header.is_resp() && header.seq().is_some() {
546 let resp_id =
547 gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
548 let _ = waiter.set_result(resp_id, body_read);
549 Ok(None)
550 } else {
551 if let Some(handler) = handler_map.get(header.cmd_code()) {
552 handler
553 .handle(local_id, peer_id, tunnel_id, header, body_read)
554 .await
555 } else {
556 Ok(None)
557 }
558 }
559 }
560 },
561 resp_waiter.clone(),
562 );
563 write_factory.start();
564 Arc::new(Self {
565 tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
566 cmd_handler_map,
567 })
568 }
569
570 async fn get_send(
571 &self,
572 peer_id: PeerId,
573 ) -> CmdResult<
574 ClassifiedWorkerGuard<
575 CmdNodeTunnelClassification<C>,
576 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
577 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
578 >,
579 > {
580 self.tunnel_pool
581 .get_classified_worker(CmdNodeTunnelClassification {
582 peer_id: Some(peer_id),
583 tunnel_id: None,
584 classification: None,
585 })
586 .await
587 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
588 }
589
590 async fn get_send_of_tunnel_id(
591 &self,
592 peer_id: PeerId,
593 tunnel_id: TunnelId,
594 ) -> CmdResult<
595 ClassifiedWorkerGuard<
596 CmdNodeTunnelClassification<C>,
597 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
598 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
599 >,
600 > {
601 self.tunnel_pool
602 .get_classified_worker(CmdNodeTunnelClassification {
603 peer_id: Some(peer_id),
604 tunnel_id: Some(tunnel_id),
605 classification: None,
606 })
607 .await
608 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
609 }
610
611 async fn get_classified_send(
612 &self,
613 classification: C,
614 ) -> CmdResult<
615 ClassifiedWorkerGuard<
616 CmdNodeTunnelClassification<C>,
617 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
618 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
619 >,
620 > {
621 self.tunnel_pool
622 .get_classified_worker(CmdNodeTunnelClassification {
623 peer_id: None,
624 tunnel_id: None,
625 classification: Some(classification),
626 })
627 .await
628 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
629 }
630
631 async fn get_peer_classified_send(
632 &self,
633 peer_id: PeerId,
634 classification: C,
635 ) -> CmdResult<
636 ClassifiedWorkerGuard<
637 CmdNodeTunnelClassification<C>,
638 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
639 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
640 >,
641 > {
642 self.tunnel_pool
643 .get_classified_worker(CmdNodeTunnelClassification {
644 peer_id: Some(peer_id),
645 tunnel_id: None,
646 classification: Some(classification),
647 })
648 .await
649 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
650 }
651}
652
653pub type ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
654 CmdNodeTunnelClassification<C>,
655 M,
656 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
657 ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>,
658>;
659#[async_trait::async_trait]
660impl<
661 C: WorkerClassification,
662 M: CmdTunnelMeta,
663 R: ClassifiedCmdTunnelRead<C, M>,
664 W: ClassifiedCmdTunnelWrite<C, M>,
665 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
666 LEN: RawEncode
667 + for<'a> RawDecode<'a>
668 + Copy
669 + Send
670 + Sync
671 + 'static
672 + FromPrimitive
673 + ToPrimitive
674 + RawFixedBytes,
675 CMD: RawEncode
676 + for<'a> RawDecode<'a>
677 + Copy
678 + Send
679 + Sync
680 + 'static
681 + RawFixedBytes
682 + Eq
683 + Hash
684 + Debug,
685 LISTENER: CmdTunnelListener<M, R, W>,
686>
687 CmdNode<
688 LEN,
689 CMD,
690 M,
691 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
692 ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
693 > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
694{
695 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
696 self.cmd_handler_map.insert(cmd, handler)
697 }
698
699 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
700 let mut send = self.get_send(peer_id.clone()).await?;
701 send.send(cmd, version, body).await
702 }
703
704 async fn send_with_resp(
705 &self,
706 peer_id: &PeerId,
707 cmd: CMD,
708 version: u8,
709 body: &[u8],
710 timeout: Duration,
711 ) -> CmdResult<CmdBody> {
712 let mut send = self.get_send(peer_id.clone()).await?;
713 send.send_with_resp(cmd, version, body, timeout).await
714 }
715
716 async fn send_parts(
717 &self,
718 peer_id: &PeerId,
719 cmd: CMD,
720 version: u8,
721 body: &[&[u8]],
722 ) -> CmdResult<()> {
723 let mut send = self.get_send(peer_id.clone()).await?;
724 send.send_parts(cmd, version, body).await
725 }
726
727 async fn send_parts_with_resp(
728 &self,
729 peer_id: &PeerId,
730 cmd: CMD,
731 version: u8,
732 body: &[&[u8]],
733 timeout: Duration,
734 ) -> CmdResult<CmdBody> {
735 let mut send = self.get_send(peer_id.clone()).await?;
736 send.send_parts_with_resp(cmd, version, body, timeout).await
737 }
738
739 async fn send_cmd(
740 &self,
741 peer_id: &PeerId,
742 cmd: CMD,
743 version: u8,
744 body: CmdBody,
745 ) -> CmdResult<()> {
746 let mut send = self.get_send(peer_id.clone()).await?;
747 send.send_cmd(cmd, version, body).await
748 }
749
750 async fn send_cmd_with_resp(
751 &self,
752 peer_id: &PeerId,
753 cmd: CMD,
754 version: u8,
755 body: CmdBody,
756 timeout: Duration,
757 ) -> CmdResult<CmdBody> {
758 let mut send = self.get_send(peer_id.clone()).await?;
759 send.send_cmd_with_resp(cmd, version, body, timeout).await
760 }
761
762 async fn send_by_specify_tunnel(
763 &self,
764 peer_id: &PeerId,
765 tunnel_id: TunnelId,
766 cmd: CMD,
767 version: u8,
768 body: &[u8],
769 ) -> CmdResult<()> {
770 let mut send = self
771 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
772 .await?;
773 send.send(cmd, version, body).await
774 }
775
776 async fn send_by_specify_tunnel_with_resp(
777 &self,
778 peer_id: &PeerId,
779 tunnel_id: TunnelId,
780 cmd: CMD,
781 version: u8,
782 body: &[u8],
783 timeout: Duration,
784 ) -> CmdResult<CmdBody> {
785 let mut send = self
786 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
787 .await?;
788 send.send_with_resp(cmd, version, body, timeout).await
789 }
790
791 async fn send_parts_by_specify_tunnel(
792 &self,
793 peer_id: &PeerId,
794 tunnel_id: TunnelId,
795 cmd: CMD,
796 version: u8,
797 body: &[&[u8]],
798 ) -> CmdResult<()> {
799 let mut send = self
800 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
801 .await?;
802 send.send_parts(cmd, version, body).await
803 }
804
805 async fn send_parts_by_specify_tunnel_with_resp(
806 &self,
807 peer_id: &PeerId,
808 tunnel_id: TunnelId,
809 cmd: CMD,
810 version: u8,
811 body: &[&[u8]],
812 timeout: Duration,
813 ) -> CmdResult<CmdBody> {
814 let mut send = self
815 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
816 .await?;
817 send.send_parts_with_resp(cmd, version, body, timeout).await
818 }
819
820 async fn send_cmd_by_specify_tunnel(
821 &self,
822 peer_id: &PeerId,
823 tunnel_id: TunnelId,
824 cmd: CMD,
825 version: u8,
826 body: CmdBody,
827 ) -> CmdResult<()> {
828 let mut send = self
829 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
830 .await?;
831 send.send_cmd(cmd, version, body).await
832 }
833
834 async fn send_cmd_by_specify_tunnel_with_resp(
835 &self,
836 peer_id: &PeerId,
837 tunnel_id: TunnelId,
838 cmd: CMD,
839 version: u8,
840 body: CmdBody,
841 timeout: Duration,
842 ) -> CmdResult<CmdBody> {
843 let mut send = self
844 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
845 .await?;
846 send.send_cmd_with_resp(cmd, version, body, timeout).await
847 }
848
849 async fn clear_all_tunnel(&self) {
850 self.tunnel_pool.clear_all_worker().await;
851 }
852
853 async fn get_send(
854 &self,
855 peer_id: &PeerId,
856 tunnel_id: TunnelId,
857 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
858 Ok(ClassifiedSendGuard {
859 worker_guard: self
860 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
861 .await?,
862 _p: Default::default(),
863 })
864 }
865}
866
867#[async_trait::async_trait]
868impl<
869 C: WorkerClassification,
870 M: CmdTunnelMeta,
871 R: ClassifiedCmdTunnelRead<C, M>,
872 W: ClassifiedCmdTunnelWrite<C, M>,
873 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
874 LEN: RawEncode
875 + for<'a> RawDecode<'a>
876 + Copy
877 + Send
878 + Sync
879 + 'static
880 + FromPrimitive
881 + ToPrimitive
882 + RawFixedBytes,
883 CMD: RawEncode
884 + for<'a> RawDecode<'a>
885 + Copy
886 + Send
887 + Sync
888 + 'static
889 + RawFixedBytes
890 + Eq
891 + Hash
892 + Debug,
893 LISTENER: CmdTunnelListener<M, R, W>,
894>
895 ClassifiedCmdNode<
896 LEN,
897 CMD,
898 C,
899 M,
900 ClassifiedCmdSend<C, M, R, W, LEN, CMD>,
901 ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>,
902 > for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER>
903{
904 async fn send_by_classified_tunnel(
905 &self,
906 classification: C,
907 cmd: CMD,
908 version: u8,
909 body: &[u8],
910 ) -> CmdResult<()> {
911 let mut send = self.get_classified_send(classification).await?;
912 send.send(cmd, version, body).await
913 }
914
915 async fn send_by_classified_tunnel_with_resp(
916 &self,
917 classification: C,
918 cmd: CMD,
919 version: u8,
920 body: &[u8],
921 timeout: Duration,
922 ) -> CmdResult<CmdBody> {
923 let mut send = self.get_classified_send(classification).await?;
924 send.send_with_resp(cmd, version, body, timeout).await
925 }
926
927 async fn send_parts_by_classified_tunnel(
928 &self,
929 classification: C,
930 cmd: CMD,
931 version: u8,
932 body: &[&[u8]],
933 ) -> CmdResult<()> {
934 let mut send = self.get_classified_send(classification).await?;
935 send.send_parts(cmd, version, body).await
936 }
937
938 async fn send_parts_by_classified_tunnel_with_resp(
939 &self,
940 classification: C,
941 cmd: CMD,
942 version: u8,
943 body: &[&[u8]],
944 timeout: Duration,
945 ) -> CmdResult<CmdBody> {
946 let mut send = self.get_classified_send(classification).await?;
947 send.send_parts_with_resp(cmd, version, body, timeout).await
948 }
949
950 async fn send_cmd_by_classified_tunnel(
951 &self,
952 classification: C,
953 cmd: CMD,
954 version: u8,
955 body: CmdBody,
956 ) -> CmdResult<()> {
957 let mut send = self.get_classified_send(classification).await?;
958 send.send_cmd(cmd, version, body).await
959 }
960
961 async fn send_cmd_by_classified_tunnel_with_resp(
962 &self,
963 classification: C,
964 cmd: CMD,
965 version: u8,
966 body: CmdBody,
967 timeout: Duration,
968 ) -> CmdResult<CmdBody> {
969 let mut send = self.get_classified_send(classification).await?;
970 send.send_cmd_with_resp(cmd, version, body, timeout).await
971 }
972
973 async fn send_by_peer_classified_tunnel(
974 &self,
975 peer_id: &PeerId,
976 classification: C,
977 cmd: CMD,
978 version: u8,
979 body: &[u8],
980 ) -> CmdResult<()> {
981 let mut send = self
982 .get_peer_classified_send(peer_id.clone(), classification)
983 .await?;
984 send.send(cmd, version, body).await
985 }
986
987 async fn send_by_peer_classified_tunnel_with_resp(
988 &self,
989 peer_id: &PeerId,
990 classification: C,
991 cmd: CMD,
992 version: u8,
993 body: &[u8],
994 timeout: Duration,
995 ) -> CmdResult<CmdBody> {
996 let mut send = self
997 .get_peer_classified_send(peer_id.clone(), classification)
998 .await?;
999 send.send_with_resp(cmd, version, body, timeout).await
1000 }
1001
1002 async fn send_parts_by_peer_classified_tunnel(
1003 &self,
1004 peer_id: &PeerId,
1005 classification: C,
1006 cmd: CMD,
1007 version: u8,
1008 body: &[&[u8]],
1009 ) -> CmdResult<()> {
1010 let mut send = self
1011 .get_peer_classified_send(peer_id.clone(), classification)
1012 .await?;
1013 send.send_parts(cmd, version, body).await
1014 }
1015
1016 async fn send_parts_by_peer_classified_tunnel_with_resp(
1017 &self,
1018 peer_id: &PeerId,
1019 classification: C,
1020 cmd: CMD,
1021 version: u8,
1022 body: &[&[u8]],
1023 timeout: Duration,
1024 ) -> CmdResult<CmdBody> {
1025 let mut send = self
1026 .get_peer_classified_send(peer_id.clone(), classification)
1027 .await?;
1028 send.send_parts_with_resp(cmd, version, body, timeout).await
1029 }
1030
1031 async fn send_cmd_by_peer_classified_tunnel(
1032 &self,
1033 peer_id: &PeerId,
1034 classification: C,
1035 cmd: CMD,
1036 version: u8,
1037 body: CmdBody,
1038 ) -> CmdResult<()> {
1039 let mut send = self
1040 .get_peer_classified_send(peer_id.clone(), classification)
1041 .await?;
1042 send.send_cmd(cmd, version, body).await
1043 }
1044
1045 async fn send_cmd_by_peer_classified_tunnel_with_resp(
1046 &self,
1047 peer_id: &PeerId,
1048 classification: C,
1049 cmd: CMD,
1050 version: u8,
1051 body: CmdBody,
1052 timeout: Duration,
1053 ) -> CmdResult<CmdBody> {
1054 let mut send = self
1055 .get_peer_classified_send(peer_id.clone(), classification)
1056 .await?;
1057 send.send_cmd_with_resp(cmd, version, body, timeout).await
1058 }
1059
1060 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId> {
1061 let send = self.get_classified_send(classification).await?;
1062 Ok(send.get_tunnel_id())
1063 }
1064
1065 async fn find_tunnel_id_by_peer_classified(
1066 &self,
1067 peer_id: &PeerId,
1068 classification: C,
1069 ) -> CmdResult<TunnelId> {
1070 let send = self
1071 .get_peer_classified_send(peer_id.clone(), classification)
1072 .await?;
1073 Ok(send.get_tunnel_id())
1074 }
1075
1076 async fn get_send_by_classified(
1077 &self,
1078 classification: C,
1079 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1080 Ok(ClassifiedSendGuard {
1081 worker_guard: self.get_classified_send(classification).await?,
1082 _p: Default::default(),
1083 })
1084 }
1085
1086 async fn get_send_by_peer_classified(
1087 &self,
1088 peer_id: &PeerId,
1089 classification: C,
1090 ) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
1091 Ok(ClassifiedSendGuard {
1092 worker_guard: self
1093 .get_peer_classified_send(peer_id.clone(), classification)
1094 .await?,
1095 _p: Default::default(),
1096 })
1097 }
1098}