1use super::CmdServer;
2use super::peer_manager::{PeerManager, PeerManagerRef};
3use crate::client::{RespWaiter, RespWaiterRef, gen_resp_id, gen_seq};
4use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
5use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
6use crate::peer_connection::PeerConnection;
7use crate::peer_id::PeerId;
8use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
9use async_named_locker::{NamedStateHolder, ObjectHolder};
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_split::Splittable;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::marker::PhantomData;
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
20
21#[async_trait::async_trait]
22pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
23 Send + Sync + 'static
24{
25 async fn accept(&self) -> CmdResult<Splittable<R, W>>;
26}
27
28#[async_trait::async_trait]
29pub trait CmdTunnelService<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
30 Send + Sync + 'static
31{
32 async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()>;
33}
34
35#[async_trait::async_trait]
36pub trait CmdServerEventListener: Send + Sync + 'static {
37 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
38 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
39}
40
41#[derive(Clone)]
42struct CmdServerEventListenerEmit {
43 listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
44}
45
46impl CmdServerEventListenerEmit {
47 pub fn new() -> Self {
48 Self {
49 listeners: Arc::new(Mutex::new(Vec::new())),
50 }
51 }
52
53 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
54 self.listeners.lock().unwrap().push(event_listener);
55 }
56}
57
58#[async_trait::async_trait]
59impl CmdServerEventListener for CmdServerEventListenerEmit {
60 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
61 let listeners = { self.listeners.lock().unwrap().clone() };
62 for listener in listeners.iter() {
63 if let Err(e) = listener.on_peer_connected(peer_id).await {
64 log::error!("on_peer_connected error: {:?}", e);
65 }
66 }
67 Ok(())
68 }
69
70 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
71 let listeners = { self.listeners.lock().unwrap().clone() };
72 for listener in listeners.iter() {
73 if let Err(e) = listener.on_peer_disconnected(peer_id).await {
74 log::error!("on_peer_disconnected error: {:?}", e);
75 }
76 }
77 Ok(())
78 }
79}
80
81pub struct DefaultCmdServerService<
82 M: CmdTunnelMeta,
83 R: CmdTunnelRead<M>,
84 W: CmdTunnelWrite<M>,
85 LEN,
86 CMD,
87> {
88 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
89 peer_manager: PeerManagerRef<M, R, W>,
90 event_emit: CmdServerEventListenerEmit,
91 resp_waiter: RespWaiterRef,
92 state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
93 _p: PhantomData<fn() -> (M, R, W, LEN, CMD)>,
94}
95
96impl<
97 M: CmdTunnelMeta,
98 R: CmdTunnelRead<M>,
99 W: CmdTunnelWrite<M>,
100 LEN: RawEncode
101 + for<'a> RawDecode<'a>
102 + Copy
103 + RawFixedBytes
104 + Sync
105 + Send
106 + 'static
107 + FromPrimitive
108 + ToPrimitive,
109 CMD: RawEncode
110 + for<'a> RawDecode<'a>
111 + Copy
112 + RawFixedBytes
113 + Sync
114 + Send
115 + 'static
116 + Eq
117 + Hash
118 + Debug,
119> DefaultCmdServerService<M, R, W, LEN, CMD>
120{
121 fn encode_pkg_len(len: u64) -> CmdResult<LEN> {
122 LEN::from_u64(len).ok_or_else(|| {
123 cmd_err!(
124 CmdErrorCode::InvalidParam,
125 "body len {} exceeds header len type {}",
126 len,
127 std::any::type_name::<LEN>()
128 )
129 })
130 }
131
132 pub fn new() -> Arc<Self> {
133 let event_emit = CmdServerEventListenerEmit::new();
134 Arc::new(Self {
135 cmd_handler_map: Arc::new(CmdHandlerMap::new()),
136 peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
137 event_emit,
138 resp_waiter: Arc::new(RespWaiter::new()),
139 state_holder: NamedStateHolder::new(),
140 _p: PhantomData,
141 })
142 }
143
144 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
145 self.event_emit.attach_event_listener(event_listener);
146 }
147
148 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
149 self.peer_manager.find_connections(peer_id)
150 }
151
152 pub async fn serve_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
153 let peer_id = tunnel.get_remote_peer_id();
154 let tunnel_id = self.peer_manager.generate_conn_id();
155 let cmd_handler_map = self.cmd_handler_map.clone();
156 let resp_waiter = self.resp_waiter.clone();
157 let state_holder = self.state_holder.clone();
158 let (mut reader, writer) = tunnel.split();
159 let local_id = reader.get_local_peer_id();
160 let writer = ObjectHolder::new(writer);
161 let resp_write = writer.clone();
162 let remote_id = peer_id.clone();
163 let recv_handle = tokio::spawn(async move {
164 let ret: CmdResult<()> = async move {
165 loop {
166 let header_len = reader
167 .read_u8()
168 .await
169 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
170 let mut header = vec![0u8; header_len as usize];
171 let n = reader
172 .read_exact(&mut header)
173 .await
174 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
175 if n == 0 {
176 break;
177 }
178 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
179 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
180 sfo_log::debug!("recv cmd {:?}", header.cmd_code());
181 let body_len = header.pkg_len().to_u64().unwrap();
182 let cmd_read = CmdBodyRead::new(reader, body_len as usize);
183 let waiter = cmd_read.get_waiter();
184 let future = waiter
185 .create_result_future()
186 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
187 {
188 let body = CmdBody::from_reader(BufReader::new(cmd_read), body_len);
189 if header.is_resp() && header.seq().is_some() {
190 let resp_id =
191 gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
192 let _ = resp_waiter.set_result(resp_id, body);
193 } else if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
194 let version = header.version();
195 let seq = header.seq();
196 let cmd_code = header.cmd_code();
197 match {
198 let _handle_state = state_holder.new_state(tokio::task::id());
199 handler
200 .handle(
201 local_id.clone(),
202 remote_id.clone(),
203 tunnel_id,
204 header,
205 body,
206 )
207 .await
208 } {
209 Ok(Some(mut body)) => {
210 let mut write = resp_write.get().await;
211 let header = CmdHeader::<LEN, CMD>::new(
212 version,
213 true,
214 seq,
215 cmd_code,
216 Self::encode_pkg_len(body.len())?,
217 );
218 let buf = header
219 .to_vec()
220 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
221 if buf.len() > 255 {
222 return Err(cmd_err!(
223 CmdErrorCode::RawCodecError,
224 "header len too large"
225 ));
226 }
227 write
228 .write_u8(buf.len() as u8)
229 .await
230 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
231 write
232 .write_all(buf.as_slice())
233 .await
234 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
235 tokio::io::copy(&mut body, write.deref_mut().deref_mut())
236 .await
237 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
238 write
239 .flush()
240 .await
241 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
242 }
243 Err(e) => {
244 log::error!("handle cmd error: {:?}", e);
245 }
246 Ok(None) => {}
247 }
248 }
249 }
250 reader = future
251 .await
252 .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
253 }
254 Ok(())
255 }
256 .await;
257 if let Err(e) = ret.as_ref() {
258 log::error!("recv cmd error: {:?}", e);
259 }
260 ret
261 });
262
263 let peer_conn = PeerConnection {
264 conn_id: tunnel_id,
265 peer_id,
266 send: writer,
267 handle: Some(recv_handle),
268 };
269 self.peer_manager.add_peer_connection(peer_conn).await;
270 Ok(())
271 }
272}
273
274#[async_trait::async_trait]
275impl<
276 M: CmdTunnelMeta,
277 R: CmdTunnelRead<M>,
278 W: CmdTunnelWrite<M>,
279 LEN: RawEncode
280 + for<'a> RawDecode<'a>
281 + Copy
282 + RawFixedBytes
283 + Sync
284 + Send
285 + 'static
286 + FromPrimitive
287 + ToPrimitive,
288 CMD: RawEncode
289 + for<'a> RawDecode<'a>
290 + Copy
291 + RawFixedBytes
292 + Sync
293 + Send
294 + 'static
295 + Eq
296 + Hash
297 + Debug,
298> CmdTunnelService<M, R, W> for DefaultCmdServerService<M, R, W, LEN, CMD>
299{
300 async fn handle_tunnel(&self, tunnel: Splittable<R, W>) -> CmdResult<()> {
301 self.serve_tunnel(tunnel).await
302 }
303}
304
305#[async_trait::async_trait]
306impl<
307 M: CmdTunnelMeta,
308 R: CmdTunnelRead<M>,
309 W: CmdTunnelWrite<M>,
310 LEN: RawEncode
311 + for<'a> RawDecode<'a>
312 + Copy
313 + RawFixedBytes
314 + Sync
315 + Send
316 + 'static
317 + FromPrimitive
318 + ToPrimitive,
319 CMD: RawEncode
320 + for<'a> RawDecode<'a>
321 + Copy
322 + RawFixedBytes
323 + Sync
324 + Send
325 + 'static
326 + Eq
327 + Hash
328 + Debug,
329> CmdServer<LEN, CMD> for DefaultCmdServerService<M, R, W, LEN, CMD>
330{
331 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
332 self.cmd_handler_map.insert(cmd, handler);
333 }
334
335 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
336 let connections = self.peer_manager.find_connections(peer_id);
337 if connections.is_empty() {
338 return Err(cmd_err!(
339 CmdErrorCode::PeerConnectionNotFound,
340 "peer_id: {}",
341 peer_id
342 ));
343 }
344 let mut last_err = None;
345 for conn in connections {
346 let ret: CmdResult<()> = async move {
347 log::debug!(
348 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
349 peer_id,
350 conn.conn_id,
351 cmd,
352 body.len(),
353 hex::encode(body)
354 );
355 let header = CmdHeader::<LEN, CMD>::new(
356 version,
357 false,
358 None,
359 cmd,
360 Self::encode_pkg_len(body.len() as u64)?,
361 );
362 let buf = header
363 .to_vec()
364 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
365 let mut send = conn.send.get().await;
366 if buf.len() > 255 {
367 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
368 }
369 send.write_u8(buf.len() as u8)
370 .await
371 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
372 send.write_all(buf.as_slice())
373 .await
374 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
375 send.write_all(body)
376 .await
377 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
378 send.flush()
379 .await
380 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
381 Ok(())
382 }
383 .await;
384 if ret.is_ok() {
385 return Ok(());
386 }
387 last_err = ret.err();
388 }
389 Err(last_err
390 .unwrap_or_else(|| cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id)))
391 }
392
393 async fn send_with_resp(
394 &self,
395 peer_id: &PeerId,
396 cmd: CMD,
397 version: u8,
398 body: &[u8],
399 timeout: Duration,
400 ) -> CmdResult<CmdBody> {
401 let connections = self.peer_manager.find_connections(peer_id);
402 for conn in connections {
403 if let Some(id) = tokio::task::try_id() {
404 if self.state_holder.has_state(id) {
405 continue;
406 }
407 }
408 let ret: CmdResult<CmdBody> = async move {
409 log::debug!(
410 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
411 peer_id,
412 conn.conn_id,
413 cmd,
414 body.len(),
415 hex::encode(body)
416 );
417 let seq = gen_seq();
418 let header = CmdHeader::<LEN, CMD>::new(
419 version,
420 false,
421 Some(seq),
422 cmd,
423 Self::encode_pkg_len(body.len() as u64)?,
424 );
425 let buf = header
426 .to_vec()
427 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
428 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
429 let waiter = self
430 .resp_waiter
431 .create_timeout_result_future(resp_id, timeout)
432 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
433 {
434 let mut send = conn.send.get().await;
435 if buf.len() > 255 {
436 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
437 }
438 send.write_u8(buf.len() as u8)
439 .await
440 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
441 send.write_all(buf.as_slice())
442 .await
443 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
444 send.write_all(body)
445 .await
446 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
447 send.flush()
448 .await
449 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
450 }
451 let body =
452 waiter
453 .await
454 .map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
455 Ok(body)
456 }
457 .await;
458 if ret.is_ok() {
459 return ret;
460 } else {
461 sfo_log::error!("send err {:?}", ret.unwrap_err());
462 }
463 }
464 Err(cmd_err!(
465 CmdErrorCode::Failed,
466 "send to peer_id: {}",
467 peer_id
468 ))
469 }
470
471 async fn send_parts(
472 &self,
473 peer_id: &PeerId,
474 cmd: CMD,
475 version: u8,
476 body: &[&[u8]],
477 ) -> CmdResult<()> {
478 let connections = self.peer_manager.find_connections(peer_id);
479 if connections.is_empty() {
480 return Err(cmd_err!(
481 CmdErrorCode::PeerConnectionNotFound,
482 "peer_id: {}",
483 peer_id
484 ));
485 }
486 let mut last_err = None;
487 for conn in connections {
488 let ret: CmdResult<()> = async move {
489 let mut len = 0;
490 for b in body.iter() {
491 len += b.len();
492 log::debug!(
493 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
494 peer_id,
495 conn.conn_id,
496 cmd,
497 hex::encode(b)
498 );
499 }
500 log::debug!(
501 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
502 peer_id,
503 conn.conn_id,
504 cmd,
505 len
506 );
507 let header = CmdHeader::<LEN, CMD>::new(
508 version,
509 false,
510 None,
511 cmd,
512 Self::encode_pkg_len(len as u64)?,
513 );
514 let buf = header
515 .to_vec()
516 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
517 let mut send = conn.send.get().await;
518 if buf.len() > 255 {
519 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
520 }
521 send.write_u8(buf.len() as u8)
522 .await
523 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
524 send.write_all(buf.as_slice())
525 .await
526 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
527 for b in body.iter() {
528 send.write_all(b)
529 .await
530 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
531 }
532 send.flush()
533 .await
534 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
535 Ok(())
536 }
537 .await;
538 if ret.is_ok() {
539 return Ok(());
540 }
541 last_err = ret.err();
542 }
543 Err(last_err
544 .unwrap_or_else(|| cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id)))
545 }
546
547 async fn send_parts_with_resp(
548 &self,
549 peer_id: &PeerId,
550 cmd: CMD,
551 version: u8,
552 body: &[&[u8]],
553 timeout: Duration,
554 ) -> CmdResult<CmdBody> {
555 let connections = self.peer_manager.find_connections(peer_id);
556 for conn in connections {
557 if let Some(id) = tokio::task::try_id() {
558 if self.state_holder.has_state(id) {
559 continue;
560 }
561 }
562 let ret: CmdResult<CmdBody> = async move {
563 let mut len = 0;
564 for b in body.iter() {
565 len += b.len();
566 log::debug!(
567 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
568 peer_id,
569 conn.conn_id,
570 cmd,
571 hex::encode(b)
572 );
573 }
574 log::debug!(
575 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
576 peer_id,
577 conn.conn_id,
578 cmd,
579 len
580 );
581 let seq = gen_seq();
582 let header = CmdHeader::<LEN, CMD>::new(
583 version,
584 false,
585 Some(seq),
586 cmd,
587 Self::encode_pkg_len(len as u64)?,
588 );
589 let buf = header
590 .to_vec()
591 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
592 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
593 let waiter = self
594 .resp_waiter
595 .create_timeout_result_future(resp_id, timeout)
596 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
597 {
598 let mut send = conn.send.get().await;
599 if buf.len() > 255 {
600 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
601 }
602 send.write_u8(buf.len() as u8)
603 .await
604 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
605 send.write_all(buf.as_slice())
606 .await
607 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
608 for b in body.iter() {
609 send.write_all(b)
610 .await
611 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
612 }
613 send.flush()
614 .await
615 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
616 }
617 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
618 Ok(body)
619 }
620 .await;
621 if ret.is_ok() {
622 return ret;
623 }
624 }
625 Err(cmd_err!(
626 CmdErrorCode::Failed,
627 "send to peer_id: {}",
628 peer_id
629 ))
630 }
631
632 async fn send_cmd(
633 &self,
634 peer_id: &PeerId,
635 cmd: CMD,
636 version: u8,
637 mut body: CmdBody,
638 ) -> CmdResult<()> {
639 let connections = self.peer_manager.find_connections(peer_id);
640 if connections.is_empty() {
641 return Err(cmd_err!(
642 CmdErrorCode::PeerConnectionNotFound,
643 "peer_id: {}",
644 peer_id
645 ));
646 }
647 let mut last_err = None;
648 for conn in connections {
649 let ret: CmdResult<()> = async {
650 log::debug!(
651 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
652 peer_id,
653 conn.conn_id,
654 cmd,
655 body.len(),
656 "<streaming>"
657 );
658 let header = CmdHeader::<LEN, CMD>::new(
659 version,
660 false,
661 None,
662 cmd,
663 Self::encode_pkg_len(body.len() as u64)?,
664 );
665 let buf = header
666 .to_vec()
667 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
668 let mut send = conn.send.get().await;
669 if buf.len() > 255 {
670 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
671 }
672 send.write_u8(buf.len() as u8)
673 .await
674 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
675 send.write_all(buf.as_slice())
676 .await
677 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
678 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
679 .await
680 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
681 send.flush()
682 .await
683 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
684 Ok(())
685 }
686 .await;
687 if ret.is_ok() {
688 return Ok(());
689 }
690 last_err = ret.err();
691 }
692 Err(last_err
693 .unwrap_or_else(|| cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id)))
694 }
695
696 async fn send_cmd_with_resp(
697 &self,
698 peer_id: &PeerId,
699 cmd: CMD,
700 version: u8,
701 mut body: CmdBody,
702 timeout: Duration,
703 ) -> CmdResult<CmdBody> {
704 let connections = self.peer_manager.find_connections(peer_id);
705 if connections.is_empty() {
706 return Err(cmd_err!(
707 CmdErrorCode::PeerConnectionNotFound,
708 "peer_id: {}",
709 peer_id
710 ));
711 }
712 let mut last_err = None;
713 for conn in connections {
714 if let Some(id) = tokio::task::try_id() {
715 if self.state_holder.has_state(id) {
716 continue;
717 }
718 }
719 let ret: CmdResult<CmdBody> = async {
720 log::debug!(
721 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}",
722 peer_id,
723 conn.conn_id,
724 cmd,
725 body.len()
726 );
727 let seq = gen_seq();
728 let header = CmdHeader::<LEN, CMD>::new(
729 version,
730 false,
731 Some(seq),
732 cmd,
733 Self::encode_pkg_len(body.len())?,
734 );
735 let buf = header
736 .to_vec()
737 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
738 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
739 let waiter = self
740 .resp_waiter
741 .create_timeout_result_future(resp_id, timeout)
742 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
743 {
744 let mut send = conn.send.get().await;
745 if buf.len() > 255 {
746 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
747 }
748 send.write_u8(buf.len() as u8)
749 .await
750 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
751 send.write_all(buf.as_slice())
752 .await
753 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
754 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
755 .await
756 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
757 send.flush()
758 .await
759 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
760 }
761 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
762 Ok(body)
763 }
764 .await;
765 if ret.is_ok() {
766 return ret;
767 }
768 last_err = ret.err();
769 }
770 Err(last_err
771 .unwrap_or_else(|| cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id)))
772 }
773
774 async fn send_by_specify_tunnel(
775 &self,
776 peer_id: &PeerId,
777 tunnel_id: TunnelId,
778 cmd: CMD,
779 version: u8,
780 body: &[u8],
781 ) -> CmdResult<()> {
782 let conn = self.peer_manager.find_connection(tunnel_id);
783 if conn.is_none() {
784 return Err(cmd_err!(
785 CmdErrorCode::PeerConnectionNotFound,
786 "tunnel_id: {:?}",
787 tunnel_id
788 ));
789 }
790 let conn = conn.unwrap();
791 assert_eq!(tunnel_id, conn.conn_id);
792 log::trace!(
793 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
794 peer_id,
795 conn.conn_id,
796 cmd,
797 body.len(),
798 hex::encode(body)
799 );
800 let header = CmdHeader::<LEN, CMD>::new(
801 version,
802 false,
803 None,
804 cmd,
805 Self::encode_pkg_len(body.len() as u64)?,
806 );
807 let buf = header
808 .to_vec()
809 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
810 let mut send = conn.send.get().await;
811 if buf.len() > 255 {
812 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
813 }
814 send.write_u8(buf.len() as u8)
815 .await
816 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
817 send.write_all(buf.as_slice())
818 .await
819 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
820 send.write_all(body)
821 .await
822 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
823 send.flush()
824 .await
825 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
826 Ok(())
827 }
828
829 async fn send_by_specify_tunnel_with_resp(
830 &self,
831 peer_id: &PeerId,
832 tunnel_id: TunnelId,
833 cmd: CMD,
834 version: u8,
835 body: &[u8],
836 timeout: Duration,
837 ) -> CmdResult<CmdBody> {
838 let conn = self.peer_manager.find_connection(tunnel_id);
839 if conn.is_none() {
840 return Err(cmd_err!(
841 CmdErrorCode::PeerConnectionNotFound,
842 "tunnel_id: {:?}",
843 tunnel_id
844 ));
845 }
846 let conn = conn.unwrap();
847 if let Some(id) = tokio::task::try_id() {
848 if self.state_holder.has_state(id) {
849 return Err(cmd_err!(
850 CmdErrorCode::Failed,
851 "can't send msg with resp in tunnel {:?} msg handle",
852 conn.conn_id
853 ));
854 }
855 }
856 assert_eq!(tunnel_id, conn.conn_id);
857 log::trace!(
858 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
859 peer_id,
860 conn.conn_id,
861 cmd,
862 body.len(),
863 hex::encode(body)
864 );
865 let seq = gen_seq();
866 let header = CmdHeader::<LEN, CMD>::new(
867 version,
868 false,
869 Some(seq),
870 cmd,
871 Self::encode_pkg_len(body.len() as u64)?,
872 );
873 let buf = header
874 .to_vec()
875 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
876 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
877 let waiter = self
878 .resp_waiter
879 .create_timeout_result_future(resp_id, timeout)
880 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
881 {
882 let mut send = conn.send.get().await;
883 if buf.len() > 255 {
884 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
885 }
886 send.write_u8(buf.len() as u8)
887 .await
888 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
889 send.write_all(buf.as_slice())
890 .await
891 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
892 send.write_all(body)
893 .await
894 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
895 send.flush()
896 .await
897 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
898 }
899 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
900 Ok(body)
901 }
902
903 async fn send_parts_by_specify_tunnel(
904 &self,
905 peer_id: &PeerId,
906 tunnel_id: TunnelId,
907 cmd: CMD,
908 version: u8,
909 body: &[&[u8]],
910 ) -> CmdResult<()> {
911 let conn = self.peer_manager.find_connection(tunnel_id);
912 if conn.is_none() {
913 return Err(cmd_err!(
914 CmdErrorCode::PeerConnectionNotFound,
915 "tunnel_id: {:?}",
916 tunnel_id
917 ));
918 }
919 let conn = conn.unwrap();
920 assert_eq!(tunnel_id, conn.conn_id);
921 let mut len = 0;
922 for b in body.iter() {
923 len += b.len();
924 log::debug!(
925 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
926 peer_id,
927 conn.conn_id,
928 cmd,
929 hex::encode(b)
930 );
931 }
932 log::debug!(
933 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
934 peer_id,
935 conn.conn_id,
936 cmd,
937 len
938 );
939 let header = CmdHeader::<LEN, CMD>::new(
940 version,
941 false,
942 None,
943 cmd,
944 Self::encode_pkg_len(len as u64)?,
945 );
946 let buf = header
947 .to_vec()
948 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
949 if buf.len() > 255 {
950 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
951 }
952 let mut send = conn.send.get().await;
953 send.write_u8(buf.len() as u8)
954 .await
955 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
956 send.write_all(buf.as_slice())
957 .await
958 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
959 for b in body.iter() {
960 send.write_all(b)
961 .await
962 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
963 }
964 send.flush()
965 .await
966 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
967 Ok(())
968 }
969
970 async fn send_parts_by_specify_tunnel_with_resp(
971 &self,
972 peer_id: &PeerId,
973 tunnel_id: TunnelId,
974 cmd: CMD,
975 version: u8,
976 body: &[&[u8]],
977 timeout: Duration,
978 ) -> CmdResult<CmdBody> {
979 let conn = self.peer_manager.find_connection(tunnel_id);
980 if conn.is_none() {
981 return Err(cmd_err!(
982 CmdErrorCode::PeerConnectionNotFound,
983 "tunnel_id: {:?}",
984 tunnel_id
985 ));
986 }
987 let conn = conn.unwrap();
988 if let Some(id) = tokio::task::try_id() {
989 if self.state_holder.has_state(id) {
990 return Err(cmd_err!(
991 CmdErrorCode::Failed,
992 "can't send msg with resp in tunnel {:?} msg handle",
993 conn.conn_id
994 ));
995 }
996 }
997 assert_eq!(tunnel_id, conn.conn_id);
998 let mut len = 0;
999 for b in body.iter() {
1000 len += b.len();
1001 log::debug!(
1002 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
1003 peer_id,
1004 conn.conn_id,
1005 cmd,
1006 hex::encode(b)
1007 );
1008 }
1009 log::debug!(
1010 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
1011 peer_id,
1012 conn.conn_id,
1013 cmd,
1014 len
1015 );
1016 let seq = gen_seq();
1017 let header = CmdHeader::<LEN, CMD>::new(
1018 version,
1019 false,
1020 Some(seq),
1021 cmd,
1022 Self::encode_pkg_len(len as u64)?,
1023 );
1024 let buf = header
1025 .to_vec()
1026 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1027 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1028 let waiter = self
1029 .resp_waiter
1030 .create_timeout_result_future(resp_id, timeout)
1031 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1032 if buf.len() > 255 {
1033 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1034 }
1035 {
1036 let mut send = conn.send.get().await;
1037 send.write_u8(buf.len() as u8)
1038 .await
1039 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1040 send.write_all(buf.as_slice())
1041 .await
1042 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1043 for b in body.iter() {
1044 send.write_all(b)
1045 .await
1046 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1047 }
1048 send.flush()
1049 .await
1050 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1051 }
1052 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1053 Ok(body)
1054 }
1055
1056 async fn send_cmd_by_specify_tunnel(
1057 &self,
1058 peer_id: &PeerId,
1059 tunnel_id: TunnelId,
1060 cmd: CMD,
1061 version: u8,
1062 mut body: CmdBody,
1063 ) -> CmdResult<()> {
1064 let conn = self.peer_manager.find_connection(tunnel_id);
1065 if conn.is_none() {
1066 return Err(cmd_err!(
1067 CmdErrorCode::PeerConnectionNotFound,
1068 "tunnel_id: {:?}",
1069 tunnel_id
1070 ));
1071 }
1072 let conn = conn.unwrap();
1073 assert_eq!(tunnel_id, conn.conn_id);
1074 log::debug!(
1075 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1076 peer_id,
1077 conn.conn_id,
1078 cmd,
1079 body.len()
1080 );
1081 let header = CmdHeader::<LEN, CMD>::new(
1082 version,
1083 false,
1084 None,
1085 cmd,
1086 Self::encode_pkg_len(body.len())?,
1087 );
1088 let buf = header
1089 .to_vec()
1090 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1091 let mut send = conn.send.get().await;
1092 if buf.len() > 255 {
1093 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1094 }
1095 send.write_u8(buf.len() as u8)
1096 .await
1097 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1098 send.write_all(buf.as_slice())
1099 .await
1100 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1101 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1102 .await
1103 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1104 send.flush()
1105 .await
1106 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1107 Ok(())
1108 }
1109
1110 async fn send_cmd_by_specify_tunnel_with_resp(
1111 &self,
1112 peer_id: &PeerId,
1113 tunnel_id: TunnelId,
1114 cmd: CMD,
1115 version: u8,
1116 mut body: CmdBody,
1117 timeout: Duration,
1118 ) -> CmdResult<CmdBody> {
1119 let conn = self.peer_manager.find_connection(tunnel_id);
1120 if conn.is_none() {
1121 return Err(cmd_err!(
1122 CmdErrorCode::PeerConnectionNotFound,
1123 "tunnel_id: {:?}",
1124 tunnel_id
1125 ));
1126 }
1127 let conn = conn.unwrap();
1128 if let Some(id) = tokio::task::try_id() {
1129 if self.state_holder.has_state(id) {
1130 return Err(cmd_err!(
1131 CmdErrorCode::Failed,
1132 "can't send msg with resp in tunnel {:?} msg handle",
1133 conn.conn_id
1134 ));
1135 }
1136 }
1137 assert_eq!(tunnel_id, conn.conn_id);
1138 log::debug!(
1139 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1140 peer_id,
1141 conn.conn_id,
1142 cmd,
1143 body.len()
1144 );
1145 let seq = gen_seq();
1146 let header = CmdHeader::<LEN, CMD>::new(
1147 version,
1148 false,
1149 Some(seq),
1150 cmd,
1151 Self::encode_pkg_len(body.len())?,
1152 );
1153 let buf = header
1154 .to_vec()
1155 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1156 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1157 let waiter = self
1158 .resp_waiter
1159 .create_timeout_result_future(resp_id, timeout)
1160 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1161 {
1162 let mut send = conn.send.get().await;
1163 if buf.len() > 255 {
1164 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1165 }
1166 send.write_u8(buf.len() as u8)
1167 .await
1168 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1169 send.write_all(buf.as_slice())
1170 .await
1171 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1172 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1173 .await
1174 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1175 send.flush()
1176 .await
1177 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1178 }
1179 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1180 Ok(body)
1181 }
1182
1183 async fn send_by_all_tunnels(
1184 &self,
1185 peer_id: &PeerId,
1186 cmd: CMD,
1187 version: u8,
1188 body: &[u8],
1189 ) -> CmdResult<()> {
1190 let connections = self.peer_manager.find_connections(peer_id);
1191 let header = CmdHeader::<LEN, CMD>::new(
1192 version,
1193 false,
1194 None,
1195 cmd,
1196 Self::encode_pkg_len(body.len() as u64)?,
1197 );
1198 let buf = header
1199 .to_vec()
1200 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1201 if buf.len() > 255 {
1202 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1203 }
1204 for conn in connections {
1205 let ret: CmdResult<()> = async {
1206 let mut send = conn.send.get().await;
1207 send.write_u8(buf.len() as u8)
1208 .await
1209 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1210 send.write_all(buf.as_slice())
1211 .await
1212 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1213 send.write_all(body)
1214 .await
1215 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1216 send.flush()
1217 .await
1218 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1219 Ok(())
1220 }
1221 .await;
1222 if let Err(e) = ret {
1223 log::error!(
1224 "broadcast send failed peer_id: {}, tunnel_id: {:?}, cmd: {:?}, err: {:?}",
1225 peer_id,
1226 conn.conn_id,
1227 cmd,
1228 e
1229 );
1230 }
1231 }
1232 Ok(())
1233 }
1234
1235 async fn send_parts_by_all_tunnels(
1236 &self,
1237 peer_id: &PeerId,
1238 cmd: CMD,
1239 version: u8,
1240 body: &[&[u8]],
1241 ) -> CmdResult<()> {
1242 let connections = self.peer_manager.find_connections(peer_id);
1243 let mut len = 0;
1244 for b in body.iter() {
1245 len += b.len();
1246 }
1247 let header = CmdHeader::<LEN, CMD>::new(
1248 version,
1249 false,
1250 None,
1251 cmd,
1252 Self::encode_pkg_len(len as u64)?,
1253 );
1254 let buf = header
1255 .to_vec()
1256 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1257 if buf.len() > 255 {
1258 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1259 }
1260 for conn in connections {
1261 let ret: CmdResult<()> = async {
1262 let mut send = conn.send.get().await;
1263 send.write_u8(buf.len() as u8)
1264 .await
1265 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1266 send.write_all(buf.as_slice())
1267 .await
1268 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1269 for b in body.iter() {
1270 send.write_all(b)
1271 .await
1272 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1273 }
1274 send.flush()
1275 .await
1276 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1277 Ok(())
1278 }
1279 .await;
1280 if let Err(e) = ret {
1281 log::error!(
1282 "broadcast send2 failed peer_id: {}, tunnel_id: {:?}, cmd: {:?}, err: {:?}",
1283 peer_id,
1284 conn.conn_id,
1285 cmd,
1286 e
1287 );
1288 }
1289 }
1290 Ok(())
1291 }
1292}
1293
1294pub struct DefaultCmdServerIncoming<
1295 M: CmdTunnelMeta,
1296 R: CmdTunnelRead<M>,
1297 W: CmdTunnelWrite<M>,
1298 LISTENER,
1299> {
1300 tunnel_listener: LISTENER,
1301 tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1302 _p: PhantomData<fn() -> (M, R, W)>,
1303}
1304
1305impl<
1306 M: CmdTunnelMeta,
1307 R: CmdTunnelRead<M>,
1308 W: CmdTunnelWrite<M>,
1309 LISTENER: CmdTunnelListener<M, R, W>,
1310> DefaultCmdServerIncoming<M, R, W, LISTENER>
1311{
1312 pub fn new(
1313 tunnel_listener: LISTENER,
1314 tunnel_service: Arc<dyn CmdTunnelService<M, R, W>>,
1315 ) -> Arc<Self> {
1316 Arc::new(Self {
1317 tunnel_listener,
1318 tunnel_service,
1319 _p: PhantomData,
1320 })
1321 }
1322
1323 pub fn start(self: &Arc<Self>) {
1324 let this = self.clone();
1325 tokio::spawn(async move {
1326 if let Err(e) = this.run().await {
1327 log::error!("cmd server error: {:?}", e);
1328 }
1329 });
1330 }
1331
1332 pub async fn run(&self) -> CmdResult<()> {
1333 loop {
1334 let tunnel = self.tunnel_listener.accept().await?;
1335 let tunnel_service = self.tunnel_service.clone();
1336 tokio::spawn(async move {
1337 if let Err(e) = tunnel_service.handle_tunnel(tunnel).await {
1338 log::error!("peer connection error: {:?}", e);
1339 }
1340 });
1341 }
1342 }
1343}
1344
1345pub struct DefaultCmdServer<
1346 M: CmdTunnelMeta,
1347 R: CmdTunnelRead<M>,
1348 W: CmdTunnelWrite<M>,
1349 LEN,
1350 CMD,
1351 LISTENER,
1352> {
1353 incoming: Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>>,
1354 service: Arc<DefaultCmdServerService<M, R, W, LEN, CMD>>,
1355}
1356
1357impl<
1358 M: CmdTunnelMeta,
1359 R: CmdTunnelRead<M>,
1360 W: CmdTunnelWrite<M>,
1361 LEN: RawEncode
1362 + for<'a> RawDecode<'a>
1363 + Copy
1364 + RawFixedBytes
1365 + Sync
1366 + Send
1367 + 'static
1368 + FromPrimitive
1369 + ToPrimitive,
1370 CMD: RawEncode
1371 + for<'a> RawDecode<'a>
1372 + Copy
1373 + RawFixedBytes
1374 + Sync
1375 + Send
1376 + 'static
1377 + Eq
1378 + Hash
1379 + Debug,
1380 LISTENER: CmdTunnelListener<M, R, W>,
1381> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1382{
1383 pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
1384 let service = DefaultCmdServerService::new();
1385 let incoming = DefaultCmdServerIncoming::new(tunnel_listener, service.clone());
1386 Arc::new(Self { incoming, service })
1387 }
1388
1389 pub fn incoming(&self) -> Arc<DefaultCmdServerIncoming<M, R, W, LISTENER>> {
1390 self.incoming.clone()
1391 }
1392
1393 pub fn service(&self) -> Arc<DefaultCmdServerService<M, R, W, LEN, CMD>> {
1394 self.service.clone()
1395 }
1396
1397 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
1398 self.service.attach_event_listener(event_listener);
1399 }
1400
1401 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
1402 self.service.get_peer_tunnels(peer_id).await
1403 }
1404
1405 pub fn start(self: &Arc<Self>) {
1406 self.incoming.start();
1407 }
1408}
1409
1410impl<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> Deref
1411 for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1412{
1413 type Target = DefaultCmdServerService<M, R, W, LEN, CMD>;
1414
1415 fn deref(&self) -> &Self::Target {
1416 self.service.as_ref()
1417 }
1418}
1419
1420#[async_trait::async_trait]
1421impl<
1422 M: CmdTunnelMeta,
1423 R: CmdTunnelRead<M>,
1424 W: CmdTunnelWrite<M>,
1425 LEN: RawEncode
1426 + for<'a> RawDecode<'a>
1427 + Copy
1428 + RawFixedBytes
1429 + Sync
1430 + Send
1431 + 'static
1432 + FromPrimitive
1433 + ToPrimitive,
1434 CMD: RawEncode
1435 + for<'a> RawDecode<'a>
1436 + Copy
1437 + RawFixedBytes
1438 + Sync
1439 + Send
1440 + 'static
1441 + Eq
1442 + Hash
1443 + Debug,
1444 LISTENER: CmdTunnelListener<M, R, W>,
1445> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
1446{
1447 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
1448 self.service.register_cmd_handler(cmd, handler);
1449 }
1450
1451 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
1452 self.service.send(peer_id, cmd, version, body).await
1453 }
1454
1455 async fn send_with_resp(
1456 &self,
1457 peer_id: &PeerId,
1458 cmd: CMD,
1459 version: u8,
1460 body: &[u8],
1461 timeout: Duration,
1462 ) -> CmdResult<CmdBody> {
1463 self.service
1464 .send_with_resp(peer_id, cmd, version, body, timeout)
1465 .await
1466 }
1467
1468 async fn send_parts(
1469 &self,
1470 peer_id: &PeerId,
1471 cmd: CMD,
1472 version: u8,
1473 body: &[&[u8]],
1474 ) -> CmdResult<()> {
1475 self.service.send_parts(peer_id, cmd, version, body).await
1476 }
1477
1478 async fn send_parts_with_resp(
1479 &self,
1480 peer_id: &PeerId,
1481 cmd: CMD,
1482 version: u8,
1483 body: &[&[u8]],
1484 timeout: Duration,
1485 ) -> CmdResult<CmdBody> {
1486 self.service
1487 .send_parts_with_resp(peer_id, cmd, version, body, timeout)
1488 .await
1489 }
1490
1491 async fn send_cmd(
1492 &self,
1493 peer_id: &PeerId,
1494 cmd: CMD,
1495 version: u8,
1496 body: CmdBody,
1497 ) -> CmdResult<()> {
1498 self.service.send_cmd(peer_id, cmd, version, body).await
1499 }
1500
1501 async fn send_cmd_with_resp(
1502 &self,
1503 peer_id: &PeerId,
1504 cmd: CMD,
1505 version: u8,
1506 body: CmdBody,
1507 timeout: Duration,
1508 ) -> CmdResult<CmdBody> {
1509 self.service
1510 .send_cmd_with_resp(peer_id, cmd, version, body, timeout)
1511 .await
1512 }
1513
1514 async fn send_by_specify_tunnel(
1515 &self,
1516 peer_id: &PeerId,
1517 tunnel_id: TunnelId,
1518 cmd: CMD,
1519 version: u8,
1520 body: &[u8],
1521 ) -> CmdResult<()> {
1522 self.service
1523 .send_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1524 .await
1525 }
1526
1527 async fn send_by_specify_tunnel_with_resp(
1528 &self,
1529 peer_id: &PeerId,
1530 tunnel_id: TunnelId,
1531 cmd: CMD,
1532 version: u8,
1533 body: &[u8],
1534 timeout: Duration,
1535 ) -> CmdResult<CmdBody> {
1536 self.service
1537 .send_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1538 .await
1539 }
1540
1541 async fn send_parts_by_specify_tunnel(
1542 &self,
1543 peer_id: &PeerId,
1544 tunnel_id: TunnelId,
1545 cmd: CMD,
1546 version: u8,
1547 body: &[&[u8]],
1548 ) -> CmdResult<()> {
1549 self.service
1550 .send_parts_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1551 .await
1552 }
1553
1554 async fn send_parts_by_specify_tunnel_with_resp(
1555 &self,
1556 peer_id: &PeerId,
1557 tunnel_id: TunnelId,
1558 cmd: CMD,
1559 version: u8,
1560 body: &[&[u8]],
1561 timeout: Duration,
1562 ) -> CmdResult<CmdBody> {
1563 self.service
1564 .send_parts_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1565 .await
1566 }
1567
1568 async fn send_cmd_by_specify_tunnel(
1569 &self,
1570 peer_id: &PeerId,
1571 tunnel_id: TunnelId,
1572 cmd: CMD,
1573 version: u8,
1574 body: CmdBody,
1575 ) -> CmdResult<()> {
1576 self.service
1577 .send_cmd_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
1578 .await
1579 }
1580
1581 async fn send_cmd_by_specify_tunnel_with_resp(
1582 &self,
1583 peer_id: &PeerId,
1584 tunnel_id: TunnelId,
1585 cmd: CMD,
1586 version: u8,
1587 body: CmdBody,
1588 timeout: Duration,
1589 ) -> CmdResult<CmdBody> {
1590 self.service
1591 .send_cmd_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
1592 .await
1593 }
1594
1595 async fn send_by_all_tunnels(
1596 &self,
1597 peer_id: &PeerId,
1598 cmd: CMD,
1599 version: u8,
1600 body: &[u8],
1601 ) -> CmdResult<()> {
1602 self.service
1603 .send_by_all_tunnels(peer_id, cmd, version, body)
1604 .await
1605 }
1606
1607 async fn send_parts_by_all_tunnels(
1608 &self,
1609 peer_id: &PeerId,
1610 cmd: CMD,
1611 version: u8,
1612 body: &[&[u8]],
1613 ) -> CmdResult<()> {
1614 self.service
1615 .send_parts_by_all_tunnels(peer_id, cmd, version, body)
1616 .await
1617 }
1618}