1use crate::client::{
2 CommonCmdSend, RespWaiter, RespWaiterRef, TrackedSendGuard, TunnelReserveResult,
3 TunnelRuntimeRegistry, gen_resp_id,
4};
5use crate::cmd::CmdHandlerMap;
6use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
7use crate::node::create_recv_handle;
8use crate::server::CmdTunnelListener;
9use crate::{
10 CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId,
11 TunnelId, TunnelIdGenerator, into_pool_err, pool_err,
12};
13use async_named_locker::ObjectHolder;
14use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
15use num::{FromPrimitive, ToPrimitive};
16use sfo_pool::{
17 ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
18 ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult,
19};
20use sfo_split::Splittable;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26use tokio::task::yield_now;
27
28#[async_trait::async_trait]
29pub trait CmdNodeTunnelFactory<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
30 Send + Sync + 'static
31{
32 async fn create_tunnel(&self, remote_id: &PeerId) -> CmdResult<Splittable<R, W>>;
33}
34
35impl<M, R, W, LEN, CMD> ClassifiedWorker<(PeerId, Option<TunnelId>)>
36 for CommonCmdSend<M, R, W, LEN, CMD>
37where
38 M: CmdTunnelMeta,
39 R: CmdTunnelRead<M>,
40 W: CmdTunnelWrite<M>,
41 LEN: RawEncode
42 + for<'a> RawDecode<'a>
43 + Copy
44 + Send
45 + Sync
46 + 'static
47 + FromPrimitive
48 + ToPrimitive,
49 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
50{
51 fn is_work(&self) -> bool {
52 self.is_work && !self.recv_handle.is_finished()
53 }
54
55 fn is_valid(&self, c: (PeerId, Option<TunnelId>)) -> bool {
56 let (peer_id, tunnel_id) = c;
57 if tunnel_id.is_some() {
58 self.tunnel_id == tunnel_id.unwrap() && peer_id == self.remote_id
59 } else {
60 peer_id == self.remote_id
61 }
62 }
63
64 fn classification(&self) -> (PeerId, Option<TunnelId>) {
65 (self.remote_id.clone(), None)
66 }
67}
68
69struct CmdWriteFactoryImpl<
70 M: CmdTunnelMeta,
71 R: CmdTunnelRead<M>,
72 W: CmdTunnelWrite<M>,
73 F: CmdNodeTunnelFactory<M, R, W>,
74 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
75 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
76 LISTENER: CmdTunnelListener<M, R, W>,
77> {
78 tunnel_listener: LISTENER,
79 tunnel_factory: F,
80 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
81 tunnel_id_generator: TunnelIdGenerator,
82 resp_waiter: RespWaiterRef,
83 send_cache: Arc<Mutex<HashMap<PeerId, Vec<CommonCmdSend<M, R, W, LEN, CMD>>>>>,
84}
85
86impl<
87 M: CmdTunnelMeta,
88 R: CmdTunnelRead<M>,
89 W: CmdTunnelWrite<M>,
90 F: CmdNodeTunnelFactory<M, R, W>,
91 LEN: RawEncode
92 + for<'a> RawDecode<'a>
93 + Copy
94 + Send
95 + Sync
96 + 'static
97 + FromPrimitive
98 + ToPrimitive
99 + RawFixedBytes,
100 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
101 LISTENER: CmdTunnelListener<M, R, W>,
102> CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
103{
104 pub fn new(
105 tunnel_factory: F,
106 tunnel_listener: LISTENER,
107 cmd_handler: impl CmdHandler<LEN, CMD>,
108 resp_waiter: RespWaiterRef,
109 ) -> Self {
110 Self {
111 tunnel_listener,
112 tunnel_factory,
113 cmd_handler: Arc::new(cmd_handler),
114 tunnel_id_generator: TunnelIdGenerator::new(),
115 resp_waiter,
116 send_cache: Arc::new(Mutex::new(Default::default())),
117 }
118 }
119
120 pub fn start(self: &Arc<Self>) {
121 let this = self.clone();
122 tokio::spawn(async move {
123 if let Err(e) = this.run().await {
124 log::error!("cmd server error: {:?}", e);
125 }
126 });
127 }
128
129 async fn run(self: &Arc<Self>) -> CmdResult<()> {
130 loop {
131 let tunnel = self.tunnel_listener.accept().await?;
132 let peer_id = tunnel.get_remote_peer_id();
133 let tunnel_id = self.tunnel_id_generator.generate();
134 let resp_waiter = self.resp_waiter.clone();
135 let this = self.clone();
136 tokio::spawn(async move {
137 let ret: CmdResult<()> = async move {
138 let this = this.clone();
139 let cmd_handler = this.cmd_handler.clone();
140 let (reader, writer) = tunnel.split();
141 let remote_id = reader.get_remote_peer_id();
142 let tunnel_meta = reader.get_tunnel_meta();
143 let writer = ObjectHolder::new(writer);
144 let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
145 reader,
146 writer.clone(),
147 tunnel_id,
148 cmd_handler,
149 );
150 {
151 let mut send_cache = this.send_cache.lock().unwrap();
152 let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
153 send_list.push(CommonCmdSend::new(
154 tunnel_id,
155 recv_handle,
156 writer,
157 resp_waiter,
158 remote_id,
159 tunnel_meta,
160 ));
161 }
162 Ok(())
163 }
164 .await;
165 if let Err(e) = ret {
166 log::error!("peer connection error: {:?}", e);
167 }
168 });
169 }
170 }
171}
172
173#[async_trait::async_trait]
174impl<
175 M: CmdTunnelMeta,
176 R: CmdTunnelRead<M>,
177 W: CmdTunnelWrite<M>,
178 F: CmdNodeTunnelFactory<M, R, W>,
179 LEN: RawEncode
180 + for<'a> RawDecode<'a>
181 + Copy
182 + Send
183 + Sync
184 + 'static
185 + FromPrimitive
186 + ToPrimitive
187 + RawFixedBytes,
188 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
189 LISTENER: CmdTunnelListener<M, R, W>,
190> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
191 for CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
192{
193 async fn create(
194 &self,
195 c: Option<(PeerId, Option<TunnelId>)>,
196 ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
197 if c.is_some() {
198 let (peer_id, tunnel_id) = c.unwrap();
199 if tunnel_id.is_some() {
200 let mut send_cache = self.send_cache.lock().unwrap();
201 if let Some(send_list) = send_cache.get_mut(&peer_id) {
202 let mut send_index = None;
203 for (index, send) in send_list.iter().enumerate() {
204 if send.get_tunnel_id() == tunnel_id.unwrap() {
205 send_index = Some(index);
206 break;
207 }
208 }
209 if let Some(send_index) = send_index {
210 let send = send_list.remove(send_index);
211 Ok(send)
212 } else {
213 Err(pool_err!(
214 PoolErrorCode::Failed,
215 "tunnel {:?} not found",
216 tunnel_id.unwrap()
217 ))
218 }
219 } else {
220 Err(pool_err!(
221 PoolErrorCode::Failed,
222 "tunnel {:?} not found",
223 tunnel_id.unwrap()
224 ))
225 }
226 } else {
227 {
228 let mut send_cache = self.send_cache.lock().unwrap();
229 if let Some(send_list) = send_cache.get_mut(&peer_id) {
230 if !send_list.is_empty() {
231 let send = send_list.pop().unwrap();
232 if send_list.is_empty() {
233 send_cache.remove(&peer_id);
234 }
235 return Ok(send);
236 }
237 }
238 }
239 let tunnel = self
240 .tunnel_factory
241 .create_tunnel(&peer_id)
242 .await
243 .map_err(into_pool_err!(PoolErrorCode::Failed))?;
244 let tunnel_id = self.tunnel_id_generator.generate();
245 let (recv, write) = tunnel.split();
246 let remote_id = recv.get_remote_peer_id();
247 let tunnel_meta = recv.get_tunnel_meta();
248 let write = ObjectHolder::new(write);
249 let cmd_handler = self.cmd_handler.clone();
250 let handle = create_recv_handle::<M, R, W, LEN, CMD>(
251 recv,
252 write.clone(),
253 tunnel_id,
254 cmd_handler,
255 );
256 Ok(CommonCmdSend::new(
257 tunnel_id,
258 handle,
259 write,
260 self.resp_waiter.clone(),
261 remote_id,
262 tunnel_meta,
263 ))
264 }
265 } else {
266 Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
267 }
268 }
269}
270
271pub struct CmdNodeWriteFactory<
272 M: CmdTunnelMeta,
273 R: CmdTunnelRead<M>,
274 W: CmdTunnelWrite<M>,
275 F: CmdNodeTunnelFactory<M, R, W>,
276 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
277 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
278 LISTENER: CmdTunnelListener<M, R, W>,
279> {
280 inner: Arc<CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>>,
281}
282
283impl<
284 M: CmdTunnelMeta,
285 R: CmdTunnelRead<M>,
286 W: CmdTunnelWrite<M>,
287 F: CmdNodeTunnelFactory<M, R, W>,
288 LEN: RawEncode
289 + for<'a> RawDecode<'a>
290 + Copy
291 + Send
292 + Sync
293 + 'static
294 + FromPrimitive
295 + ToPrimitive
296 + RawFixedBytes,
297 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
298 LISTENER: CmdTunnelListener<M, R, W>,
299> CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
300{
301 pub(crate) fn new(
302 tunnel_factory: F,
303 tunnel_listener: LISTENER,
304 cmd_handler: impl CmdHandler<LEN, CMD>,
305 resp_waiter: RespWaiterRef,
306 ) -> Self {
307 Self {
308 inner: Arc::new(CmdWriteFactoryImpl::new(
309 tunnel_factory,
310 tunnel_listener,
311 cmd_handler,
312 resp_waiter,
313 )),
314 }
315 }
316
317 pub fn start(&self) {
318 self.inner.start();
319 }
320}
321
322#[async_trait::async_trait]
323impl<
324 M: CmdTunnelMeta,
325 R: CmdTunnelRead<M>,
326 W: CmdTunnelWrite<M>,
327 F: CmdNodeTunnelFactory<M, R, W>,
328 LEN: RawEncode
329 + for<'a> RawDecode<'a>
330 + Copy
331 + Send
332 + Sync
333 + 'static
334 + FromPrimitive
335 + ToPrimitive
336 + RawFixedBytes,
337 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
338 LISTENER: CmdTunnelListener<M, R, W>,
339> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
340 for CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
341{
342 async fn create(
343 &self,
344 c: Option<(PeerId, Option<TunnelId>)>,
345 ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
346 self.inner.create(c).await
347 }
348}
349pub struct DefaultCmdNode<
350 M: CmdTunnelMeta,
351 R: CmdTunnelRead<M>,
352 W: CmdTunnelWrite<M>,
353 F: CmdNodeTunnelFactory<M, R, W>,
354 LEN: RawEncode
355 + for<'a> RawDecode<'a>
356 + Copy
357 + Send
358 + Sync
359 + 'static
360 + FromPrimitive
361 + ToPrimitive
362 + RawFixedBytes,
363 CMD: RawEncode
364 + for<'a> RawDecode<'a>
365 + Copy
366 + Send
367 + Sync
368 + 'static
369 + RawFixedBytes
370 + Eq
371 + Hash
372 + Debug,
373 LISTENER: CmdTunnelListener<M, R, W>,
374> {
375 tunnel_pool: ClassifiedWorkerPoolRef<
376 (PeerId, Option<TunnelId>),
377 CommonCmdSend<M, R, W, LEN, CMD>,
378 CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
379 >,
380 runtime_tunnels: Arc<TunnelRuntimeRegistry>,
381 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
382}
383
384impl<
385 M: CmdTunnelMeta,
386 R: CmdTunnelRead<M>,
387 W: CmdTunnelWrite<M>,
388 F: CmdNodeTunnelFactory<M, R, W>,
389 LEN: RawEncode
390 + for<'a> RawDecode<'a>
391 + Copy
392 + Send
393 + Sync
394 + 'static
395 + FromPrimitive
396 + ToPrimitive
397 + RawFixedBytes,
398 CMD: RawEncode
399 + for<'a> RawDecode<'a>
400 + Copy
401 + Send
402 + Sync
403 + 'static
404 + RawFixedBytes
405 + Eq
406 + Hash
407 + Debug,
408 LISTENER: CmdTunnelListener<M, R, W>,
409> DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
410{
411 fn tunnel_not_found(tunnel_id: TunnelId) -> sfo_result::Error<CmdErrorCode> {
412 crate::errors::cmd_err!(CmdErrorCode::Failed, "tunnel {:?} not found", tunnel_id)
413 }
414
415 pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
416 let cmd_handler_map = Arc::new(CmdHandlerMap::new());
417 let handler_map = cmd_handler_map.clone();
418 let resp_waiter = Arc::new(RespWaiter::new());
419 let waiter = resp_waiter.clone();
420 let write_factory = CmdNodeWriteFactory::<M, R, W, _, LEN, CMD, LISTENER>::new(
421 factory,
422 listener,
423 move |local_id: PeerId,
424 peer_id: PeerId,
425 tunnel_id: TunnelId,
426 header: CmdHeader<LEN, CMD>,
427 body_read: CmdBody| {
428 let handler_map = handler_map.clone();
429 let waiter = waiter.clone();
430 async move {
431 if header.is_resp() && header.seq().is_some() {
432 let resp_id =
433 gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
434 let _ = waiter.set_result(resp_id, body_read);
435 Ok(None)
436 } else {
437 if let Some(handler) = handler_map.get(header.cmd_code()) {
438 handler
439 .handle(local_id, peer_id, tunnel_id, header, body_read)
440 .await
441 } else {
442 Ok(None)
443 }
444 }
445 }
446 },
447 resp_waiter.clone(),
448 );
449 write_factory.start();
450 Arc::new(Self {
451 tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
452 runtime_tunnels: TunnelRuntimeRegistry::new(),
453 cmd_handler_map,
454 })
455 }
456
457 fn tracked_send_guard(
458 &self,
459 worker_guard: ClassifiedWorkerGuard<
460 (PeerId, Option<TunnelId>),
461 CommonCmdSend<M, R, W, LEN, CMD>,
462 CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
463 >,
464 ) -> CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER> {
465 let tunnel_id = worker_guard.get_tunnel_id();
466 TrackedSendGuard::new(worker_guard, self.runtime_tunnels.clone(), tunnel_id)
467 }
468
469 async fn reserve_tunnel(&self, tunnel_id: TunnelId) -> CmdResult<()> {
470 loop {
471 match self.runtime_tunnels.reserve_existing(tunnel_id) {
472 TunnelReserveResult::Acquired => return Ok(()),
473 TunnelReserveResult::Wait(waiter) => waiter.notified().await,
474 TunnelReserveResult::Missing => return Err(Self::tunnel_not_found(tunnel_id)),
475 }
476 }
477 }
478
479 async fn get_send(
480 &self,
481 peer_id: PeerId,
482 ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
483 loop {
484 let worker_guard = self
485 .tunnel_pool
486 .get_classified_worker((peer_id.clone(), None))
487 .await
488 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))?;
489 let tunnel_id = worker_guard.get_tunnel_id();
490 if self.runtime_tunnels.mark_borrowed(tunnel_id) {
491 return Ok(self.tracked_send_guard(worker_guard));
492 }
493 drop(worker_guard);
494 yield_now().await;
495 }
496 }
497
498 async fn get_send_of_tunnel_id(
499 &self,
500 peer_id: PeerId,
501 tunnel_id: TunnelId,
502 ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
503 self.reserve_tunnel(tunnel_id).await?;
504 match self
505 .tunnel_pool
506 .get_classified_worker((peer_id, Some(tunnel_id)))
507 .await
508 {
509 Ok(worker_guard) => Ok(self.tracked_send_guard(worker_guard)),
510 Err(_) => {
511 self.runtime_tunnels.remove(tunnel_id);
512 Err(Self::tunnel_not_found(tunnel_id))
513 }
514 }
515 }
516}
517
518pub type CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER> = TrackedSendGuard<
519 (PeerId, Option<TunnelId>),
520 M,
521 CommonCmdSend<M, R, W, LEN, CMD>,
522 CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
523>;
524#[async_trait::async_trait]
525impl<
526 M: CmdTunnelMeta,
527 R: CmdTunnelRead<M>,
528 W: CmdTunnelWrite<M>,
529 F: CmdNodeTunnelFactory<M, R, W>,
530 LEN: RawEncode
531 + for<'a> RawDecode<'a>
532 + Copy
533 + RawFixedBytes
534 + Sync
535 + Send
536 + 'static
537 + FromPrimitive
538 + ToPrimitive,
539 CMD: RawEncode
540 + for<'a> RawDecode<'a>
541 + Copy
542 + RawFixedBytes
543 + Sync
544 + Send
545 + 'static
546 + Eq
547 + Hash
548 + Debug,
549 LISTENER: CmdTunnelListener<M, R, W>,
550>
551 CmdNode<
552 LEN,
553 CMD,
554 M,
555 CommonCmdSend<M, R, W, LEN, CMD>,
556 CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>,
557 > for DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
558{
559 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
560 self.cmd_handler_map.insert(cmd, handler);
561 }
562
563 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
564 let mut send = self.get_send(peer_id.clone()).await?;
565 send.send(cmd, version, body).await
566 }
567
568 async fn send_with_resp(
569 &self,
570 peer_id: &PeerId,
571 cmd: CMD,
572 version: u8,
573 body: &[u8],
574 timeout: Duration,
575 ) -> CmdResult<CmdBody> {
576 let mut send = self.get_send(peer_id.clone()).await?;
577 send.send_with_resp(cmd, version, body, timeout).await
578 }
579
580 async fn send_parts(
581 &self,
582 peer_id: &PeerId,
583 cmd: CMD,
584 version: u8,
585 body: &[&[u8]],
586 ) -> CmdResult<()> {
587 let mut send = self.get_send(peer_id.clone()).await?;
588 send.send_parts(cmd, version, body).await
589 }
590
591 async fn send_parts_with_resp(
592 &self,
593 peer_id: &PeerId,
594 cmd: CMD,
595 version: u8,
596 body: &[&[u8]],
597 timeout: Duration,
598 ) -> CmdResult<CmdBody> {
599 let mut send = self.get_send(peer_id.clone()).await?;
600 send.send_parts_with_resp(cmd, version, body, timeout).await
601 }
602
603 async fn send_cmd(
604 &self,
605 peer_id: &PeerId,
606 cmd: CMD,
607 version: u8,
608 body: CmdBody,
609 ) -> CmdResult<()> {
610 let mut send = self.get_send(peer_id.clone()).await?;
611 send.send_cmd(cmd, version, body).await
612 }
613
614 async fn send_cmd_with_resp(
615 &self,
616 peer_id: &PeerId,
617 cmd: CMD,
618 version: u8,
619 body: CmdBody,
620 timeout: Duration,
621 ) -> CmdResult<CmdBody> {
622 let mut send = self.get_send(peer_id.clone()).await?;
623 send.send_cmd_with_resp(cmd, version, body, timeout).await
624 }
625
626 async fn send_by_specify_tunnel(
627 &self,
628 peer_id: &PeerId,
629 tunnel_id: TunnelId,
630 cmd: CMD,
631 version: u8,
632 body: &[u8],
633 ) -> CmdResult<()> {
634 let mut send = self
635 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
636 .await?;
637 send.send(cmd, version, body).await
638 }
639
640 async fn send_by_specify_tunnel_with_resp(
641 &self,
642 peer_id: &PeerId,
643 tunnel_id: TunnelId,
644 cmd: CMD,
645 version: u8,
646 body: &[u8],
647 timeout: Duration,
648 ) -> CmdResult<CmdBody> {
649 let mut send = self
650 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
651 .await?;
652 send.send_with_resp(cmd, version, body, timeout).await
653 }
654
655 async fn send_parts_by_specify_tunnel(
656 &self,
657 peer_id: &PeerId,
658 tunnel_id: TunnelId,
659 cmd: CMD,
660 version: u8,
661 body: &[&[u8]],
662 ) -> CmdResult<()> {
663 let mut send = self
664 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
665 .await?;
666 send.send_parts(cmd, version, body).await
667 }
668
669 async fn send_parts_by_specify_tunnel_with_resp(
670 &self,
671 peer_id: &PeerId,
672 tunnel_id: TunnelId,
673 cmd: CMD,
674 version: u8,
675 body: &[&[u8]],
676 timeout: Duration,
677 ) -> CmdResult<CmdBody> {
678 let mut send = self
679 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
680 .await?;
681 send.send_parts_with_resp(cmd, version, body, timeout).await
682 }
683
684 async fn send_cmd_by_specify_tunnel(
685 &self,
686 peer_id: &PeerId,
687 tunnel_id: TunnelId,
688 cmd: CMD,
689 version: u8,
690 body: CmdBody,
691 ) -> CmdResult<()> {
692 let mut send = self
693 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
694 .await?;
695 send.send_cmd(cmd, version, body).await
696 }
697
698 async fn send_cmd_by_specify_tunnel_with_resp(
699 &self,
700 peer_id: &PeerId,
701 tunnel_id: TunnelId,
702 cmd: CMD,
703 version: u8,
704 body: CmdBody,
705 timeout: Duration,
706 ) -> CmdResult<CmdBody> {
707 let mut send = self
708 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
709 .await?;
710 send.send_cmd_with_resp(cmd, version, body, timeout).await
711 }
712
713 async fn clear_all_tunnel(&self) {
714 self.tunnel_pool.clear_all_worker().await;
715 self.runtime_tunnels.clear();
716 }
717
718 async fn get_send(
719 &self,
720 peer_id: &PeerId,
721 tunnel_id: TunnelId,
722 ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
723 self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await
724 }
725}