1use super::peer_manager::{PeerManager, PeerManagerRef};
2use crate::client::{RespWaiter, RespWaiterRef, gen_resp_id, gen_seq};
3use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
4use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
5use crate::peer_connection::PeerConnection;
6use crate::peer_id::PeerId;
7use crate::server::CmdServer;
8use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
9use async_named_locker::{NamedStateHolder, ObjectHolder};
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_split::Splittable;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::DerefMut;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
19
20#[async_trait::async_trait]
21pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
22 Send + Sync + 'static
23{
24 async fn accept(&self) -> CmdResult<Splittable<R, W>>;
25}
26
27#[async_trait::async_trait]
28pub trait CmdServerEventListener: Send + Sync + 'static {
29 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
30 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
31}
32
33#[derive(Clone)]
34struct CmdServerEventListenerEmit {
35 listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
36}
37
38impl CmdServerEventListenerEmit {
39 pub fn new() -> Self {
40 Self {
41 listeners: Arc::new(Mutex::new(Vec::new())),
42 }
43 }
44
45 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
46 self.listeners.lock().unwrap().push(event_listener);
47 }
48}
49
50#[async_trait::async_trait]
51impl CmdServerEventListener for CmdServerEventListenerEmit {
52 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
53 let listeners = { self.listeners.lock().unwrap().clone() };
54 for listener in listeners.iter() {
55 if let Err(e) = listener.on_peer_connected(peer_id).await {
56 log::error!("on_peer_connected error: {:?}", e);
57 }
58 }
59 Ok(())
60 }
61
62 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
63 let listeners = { self.listeners.lock().unwrap().clone() };
64 for listener in listeners.iter() {
65 if let Err(e) = listener.on_peer_disconnected(peer_id).await {
66 log::error!("on_peer_disconnected error: {:?}", e);
67 }
68 }
69 Ok(())
70 }
71}
72
73pub struct DefaultCmdServer<
74 M: CmdTunnelMeta,
75 R: CmdTunnelRead<M>,
76 W: CmdTunnelWrite<M>,
77 LEN,
78 CMD,
79 LISTENER,
80> {
81 tunnel_listener: LISTENER,
82 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
83 peer_manager: PeerManagerRef<M, R, W>,
84 event_emit: CmdServerEventListenerEmit,
85 resp_waiter: RespWaiterRef,
86 state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
87 _l: Mutex<std::marker::PhantomData<(R, W, LEN, CMD)>>,
88}
89
90impl<
91 M: CmdTunnelMeta,
92 R: CmdTunnelRead<M>,
93 W: CmdTunnelWrite<M>,
94 LEN: RawEncode
95 + for<'a> RawDecode<'a>
96 + Copy
97 + RawFixedBytes
98 + Sync
99 + Send
100 + 'static
101 + FromPrimitive
102 + ToPrimitive,
103 CMD: RawEncode
104 + for<'a> RawDecode<'a>
105 + Copy
106 + RawFixedBytes
107 + Sync
108 + Send
109 + 'static
110 + Eq
111 + Hash
112 + Debug,
113 LISTENER: CmdTunnelListener<M, R, W>,
114> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
115{
116 pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
117 let event_emit = CmdServerEventListenerEmit::new();
118 Arc::new(Self {
119 tunnel_listener,
120 cmd_handler_map: Arc::new(CmdHandlerMap::new()),
121 peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
122 event_emit,
123 resp_waiter: Arc::new(RespWaiter::new()),
124 state_holder: NamedStateHolder::new(),
125 _l: Default::default(),
126 })
127 }
128
129 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
130 self.event_emit.attach_event_listener(event_listener);
131 }
132
133 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
134 let connections = self.peer_manager.find_connections(peer_id);
135 connections
136 }
137
138 pub fn start(self: &Arc<Self>) {
139 let this = self.clone();
140 tokio::spawn(async move {
141 if let Err(e) = this.run().await {
142 log::error!("cmd server error: {:?}", e);
143 }
144 });
145 }
146
147 async fn run(self: &Arc<Self>) -> CmdResult<()> {
148 loop {
149 let tunnel = self.tunnel_listener.accept().await?;
150 let peer_id = tunnel.get_remote_peer_id();
151 let tunnel_id = self.peer_manager.generate_conn_id();
152 let this = self.clone();
153 let resp_waiter = self.resp_waiter.clone();
154 let state_holder = self.state_holder.clone();
155 tokio::spawn(async move {
156 let remote_id = peer_id.clone();
157 let ret: CmdResult<()> = async move {
158 let this = this.clone();
159 let cmd_handler_map = this.cmd_handler_map.clone();
160 let (mut reader, writer) = tunnel.split();
161 let writer = ObjectHolder::new(writer);
162 let resp_write = writer.clone();
163 let resp_waiter = resp_waiter.clone();
164 let state_holder = state_holder.clone();
165 let recv_handle = tokio::spawn(async move {
166 let ret: CmdResult<()> = async move {
167 loop {
168 let header_len = reader
169 .read_u8()
170 .await
171 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
172 let mut header = vec![0u8; header_len as usize];
173 let n = reader
174 .read_exact(&mut header)
175 .await
176 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
177 if n == 0 {
178 break;
179 }
180 let header =
181 CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
182 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
183 sfo_log::debug!("recv cmd {:?}", header.cmd_code());
184 let body_len = header.pkg_len().to_u64().unwrap();
185 let cmd_read = CmdBodyRead::new(
186 reader,
187 header.pkg_len().to_u64().unwrap() as usize,
188 );
189 let waiter = cmd_read.get_waiter();
190 let future = waiter
191 .create_result_future()
192 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
193 {
194 let body_read = cmd_read;
195 let body =
196 CmdBody::from_reader(BufReader::new(body_read), body_len);
197 if header.is_resp() && header.seq().is_some() {
198 let resp_id = gen_resp_id(
199 tunnel_id,
200 header.cmd_code(),
201 header.seq().unwrap(),
202 );
203 let _ = resp_waiter.set_result(resp_id, body);
204 } else {
205 if let Some(handler) =
206 cmd_handler_map.get(header.cmd_code())
207 {
208 let version = header.version();
209 let seq = header.seq();
210 let cmd_code = header.cmd_code();
211 match {
212 let _handle_state =
213 state_holder.new_state(tokio::task::id());
214 handler
215 .handle(
216 remote_id.clone(),
217 tunnel_id,
218 header,
219 body,
220 )
221 .await
222 } {
223 Ok(Some(mut body)) => {
224 let mut write = resp_write.get().await;
225 let header = CmdHeader::<LEN, CMD>::new(
226 version,
227 true,
228 seq,
229 cmd_code,
230 LEN::from_u64(body.len()).unwrap(),
231 );
232 let buf = header.to_vec().map_err(
233 into_cmd_err!(CmdErrorCode::RawCodecError),
234 )?;
235 if buf.len() > 255 {
236 return Err(cmd_err!(
237 CmdErrorCode::RawCodecError,
238 "header len too large"
239 ));
240 }
241 write.write_u8(buf.len() as u8).await.map_err(
242 into_cmd_err!(CmdErrorCode::IoError),
243 )?;
244 write.write_all(buf.as_slice()).await.map_err(
245 into_cmd_err!(CmdErrorCode::IoError),
246 )?;
247 tokio::io::copy(
248 &mut body,
249 write.deref_mut().deref_mut(),
250 )
251 .await
252 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
253 write.flush().await.map_err(into_cmd_err!(
254 CmdErrorCode::IoError
255 ))?;
256 }
257 Err(e) => {
258 log::error!("handle cmd error: {:?}", e);
259 }
260 _ => {}
261 }
262 }
263 }
264 };
265 reader = future
266 .await
267 .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
268 }
270 Ok(())
271 }
272 .await;
273 if ret.is_err() {
274 log::error!("recv cmd error: {:?}", ret.as_ref().err().unwrap());
275 }
276 ret
277 });
278
279 let peer_conn = PeerConnection {
280 conn_id: tunnel_id,
281 peer_id: peer_id.clone(),
282 send: writer,
283 handle: Some(recv_handle),
284 };
285 this.peer_manager.add_peer_connection(peer_conn).await;
286 Ok(())
287 }
288 .await;
289 if let Err(e) = ret {
290 log::error!("peer connection error: {:?}", e);
291 }
292 });
293 }
294 }
295}
296
297#[async_trait::async_trait]
298impl<
299 M: CmdTunnelMeta,
300 R: CmdTunnelRead<M>,
301 W: CmdTunnelWrite<M>,
302 LEN: RawEncode
303 + for<'a> RawDecode<'a>
304 + Copy
305 + RawFixedBytes
306 + Sync
307 + Send
308 + 'static
309 + FromPrimitive
310 + ToPrimitive,
311 CMD: RawEncode
312 + for<'a> RawDecode<'a>
313 + Copy
314 + RawFixedBytes
315 + Sync
316 + Send
317 + 'static
318 + Eq
319 + Hash
320 + Debug,
321 LISTENER: CmdTunnelListener<M, R, W>,
322> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER>
323{
324 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
325 self.cmd_handler_map.insert(cmd, handler);
326 }
327
328 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
329 let connections = self.peer_manager.find_connections(peer_id);
330 for conn in connections {
331 let ret: CmdResult<()> = async move {
332 log::debug!(
333 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
334 peer_id,
335 conn.conn_id,
336 cmd,
337 body.len(),
338 hex::encode(body)
339 );
340 let header = CmdHeader::<LEN, CMD>::new(
341 version,
342 false,
343 None,
344 cmd,
345 LEN::from_u64(body.len() as u64).unwrap(),
346 );
347 let buf = header
348 .to_vec()
349 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
350 let mut send = conn.send.get().await;
351 if buf.len() > 255 {
352 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
353 }
354 send.write_u8(buf.len() as u8)
355 .await
356 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
357 send.write_all(buf.as_slice())
358 .await
359 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
360 send.write_all(body)
361 .await
362 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
363 send.flush()
364 .await
365 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
366 Ok(())
367 }
368 .await;
369 if ret.is_ok() {
370 break;
371 }
372 }
373 Ok(())
374 }
375
376 async fn send_with_resp(
377 &self,
378 peer_id: &PeerId,
379 cmd: CMD,
380 version: u8,
381 body: &[u8],
382 timeout: Duration,
383 ) -> CmdResult<CmdBody> {
384 let connections = self.peer_manager.find_connections(peer_id);
385 for conn in connections {
386 if let Some(id) = tokio::task::try_id() {
387 if self.state_holder.has_state(id) {
388 continue;
389 }
390 }
391 let ret: CmdResult<CmdBody> = async move {
392 log::debug!(
393 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
394 peer_id,
395 conn.conn_id,
396 cmd,
397 body.len(),
398 hex::encode(body)
399 );
400 let seq = gen_seq();
401 let header = CmdHeader::<LEN, CMD>::new(
402 version,
403 false,
404 Some(seq),
405 cmd,
406 LEN::from_u64(body.len() as u64).unwrap(),
407 );
408 let buf = header
409 .to_vec()
410 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
411 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
412 let waiter = self
413 .resp_waiter
414 .create_timeout_result_future(resp_id, timeout)
415 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
416 {
417 let mut send = conn.send.get().await;
418 if buf.len() > 255 {
419 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
420 }
421 send.write_u8(buf.len() as u8)
422 .await
423 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
424 send.write_all(buf.as_slice())
425 .await
426 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
427 send.write_all(body)
428 .await
429 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
430 send.flush()
431 .await
432 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
433 }
434 let body =
435 waiter
436 .await
437 .map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
438 Ok(body)
439 }
440 .await;
441 if ret.is_ok() {
442 return ret;
443 } else {
444 sfo_log::error!("send err {:?}", ret.unwrap_err());
445 }
446 }
447 Err(cmd_err!(
448 CmdErrorCode::Failed,
449 "send to peer_id: {}",
450 peer_id
451 ))
452 }
453
454 async fn send2(
455 &self,
456 peer_id: &PeerId,
457 cmd: CMD,
458 version: u8,
459 body: &[&[u8]],
460 ) -> CmdResult<()> {
461 let connections = self.peer_manager.find_connections(peer_id);
462 for conn in connections {
463 let ret: CmdResult<()> = async move {
464 let mut len = 0;
465 for b in body.iter() {
466 len += b.len();
467 log::debug!(
468 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
469 peer_id,
470 conn.conn_id,
471 cmd,
472 hex::encode(b)
473 );
474 }
475 log::debug!(
476 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
477 peer_id,
478 conn.conn_id,
479 cmd,
480 len
481 );
482 let header = CmdHeader::<LEN, CMD>::new(
483 version,
484 false,
485 None,
486 cmd,
487 LEN::from_u64(len as u64).unwrap(),
488 );
489 let buf = header
490 .to_vec()
491 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
492 let mut send = conn.send.get().await;
493 if buf.len() > 255 {
494 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
495 }
496 send.write_u8(buf.len() as u8)
497 .await
498 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
499 send.write_all(buf.as_slice())
500 .await
501 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
502 for b in body.iter() {
503 send.write_all(b)
504 .await
505 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
506 }
507 send.flush()
508 .await
509 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
510 Ok(())
511 }
512 .await;
513 if ret.is_ok() {
514 break;
515 }
516 }
517 Ok(())
518 }
519
520 async fn send2_with_resp(
521 &self,
522 peer_id: &PeerId,
523 cmd: CMD,
524 version: u8,
525 body: &[&[u8]],
526 timeout: Duration,
527 ) -> CmdResult<CmdBody> {
528 let connections = self.peer_manager.find_connections(peer_id);
529 for conn in connections {
530 if let Some(id) = tokio::task::try_id() {
531 if self.state_holder.has_state(id) {
532 continue;
533 }
534 }
535 let ret: CmdResult<CmdBody> = async move {
536 let mut len = 0;
537 for b in body.iter() {
538 len += b.len();
539 log::debug!(
540 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
541 peer_id,
542 conn.conn_id,
543 cmd,
544 hex::encode(b)
545 );
546 }
547 log::debug!(
548 "send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
549 peer_id,
550 conn.conn_id,
551 cmd,
552 len
553 );
554 let seq = gen_seq();
555 let header = CmdHeader::<LEN, CMD>::new(
556 version,
557 false,
558 Some(seq),
559 cmd,
560 LEN::from_u64(len as u64).unwrap(),
561 );
562 let buf = header
563 .to_vec()
564 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
565 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
566 let waiter = self
567 .resp_waiter
568 .create_timeout_result_future(resp_id, timeout)
569 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
570 {
571 let mut send = conn.send.get().await;
572 if buf.len() > 255 {
573 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
574 }
575 send.write_u8(buf.len() as u8)
576 .await
577 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
578 send.write_all(buf.as_slice())
579 .await
580 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
581 for b in body.iter() {
582 send.write_all(b)
583 .await
584 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
585 }
586 send.flush()
587 .await
588 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
589 }
590 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
591 Ok(body)
592 }
593 .await;
594 if ret.is_ok() {
595 return ret;
596 }
597 }
598 Err(cmd_err!(
599 CmdErrorCode::Failed,
600 "send to peer_id: {}",
601 peer_id
602 ))
603 }
604
605 async fn send_cmd(
606 &self,
607 peer_id: &PeerId,
608 cmd: CMD,
609 version: u8,
610 body: CmdBody,
611 ) -> CmdResult<()> {
612 let body_data = body.into_bytes().await?;
613 let body = body_data.as_slice();
614 let connections = self.peer_manager.find_connections(peer_id);
615 for conn in connections {
616 let ret: CmdResult<()> = async move {
617 log::debug!(
618 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}",
619 peer_id,
620 conn.conn_id,
621 cmd,
622 body.len(),
623 hex::encode(body)
624 );
625 let header = CmdHeader::<LEN, CMD>::new(
626 version,
627 false,
628 None,
629 cmd,
630 LEN::from_u64(body.len() as u64).unwrap(),
631 );
632 let buf = header
633 .to_vec()
634 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
635 let mut send = conn.send.get().await;
636 if buf.len() > 255 {
637 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
638 }
639 send.write_u8(buf.len() as u8)
640 .await
641 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
642 send.write_all(buf.as_slice())
643 .await
644 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
645 send.write_all(body)
646 .await
647 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
648 send.flush()
649 .await
650 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
651 Ok(())
652 }
653 .await;
654 if ret.is_ok() {
655 break;
656 }
657 }
658 Ok(())
659 }
660
661 async fn send_cmd_with_resp(
662 &self,
663 peer_id: &PeerId,
664 cmd: CMD,
665 version: u8,
666 body: CmdBody,
667 timeout: Duration,
668 ) -> CmdResult<CmdBody> {
669 let connections = self.peer_manager.find_connections(peer_id);
670 let body_data = body.into_bytes().await?;
671 let data_ref = body_data.as_slice();
672 for conn in connections {
673 if let Some(id) = tokio::task::try_id() {
674 if self.state_holder.has_state(id) {
675 continue;
676 }
677 }
678 let ret: CmdResult<CmdBody> = async move {
679 log::debug!(
680 "send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}",
681 peer_id,
682 conn.conn_id,
683 cmd,
684 data_ref.len()
685 );
686 let seq = gen_seq();
687 let header = CmdHeader::<LEN, CMD>::new(
688 version,
689 false,
690 Some(seq),
691 cmd,
692 LEN::from_u64(data_ref.len() as u64).unwrap(),
693 );
694 let buf = header
695 .to_vec()
696 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
697 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
698 let waiter = self
699 .resp_waiter
700 .create_timeout_result_future(resp_id, timeout)
701 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
702 {
703 let mut send = conn.send.get().await;
704 if buf.len() > 255 {
705 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
706 }
707 send.write_u8(buf.len() as u8)
708 .await
709 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
710 send.write_all(buf.as_slice())
711 .await
712 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
713 send.write_all(data_ref)
714 .await
715 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
716 send.flush()
717 .await
718 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
719 }
720 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
721 Ok(body)
722 }
723 .await;
724 if ret.is_ok() {
725 return ret;
726 }
727 }
728 Err(cmd_err!(
729 CmdErrorCode::Failed,
730 "send to peer_id: {}",
731 peer_id
732 ))
733 }
734
735 async fn send_by_specify_tunnel(
736 &self,
737 peer_id: &PeerId,
738 tunnel_id: TunnelId,
739 cmd: CMD,
740 version: u8,
741 body: &[u8],
742 ) -> CmdResult<()> {
743 let conn = self.peer_manager.find_connection(tunnel_id);
744 if conn.is_none() {
745 return Err(cmd_err!(
746 CmdErrorCode::PeerConnectionNotFound,
747 "tunnel_id: {:?}",
748 tunnel_id
749 ));
750 }
751 let conn = conn.unwrap();
752 assert_eq!(tunnel_id, conn.conn_id);
753 log::trace!(
754 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
755 peer_id,
756 conn.conn_id,
757 cmd,
758 body.len(),
759 hex::encode(body)
760 );
761 let header = CmdHeader::<LEN, CMD>::new(
762 version,
763 false,
764 None,
765 cmd,
766 LEN::from_u64(body.len() as u64).unwrap(),
767 );
768 let buf = header
769 .to_vec()
770 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
771 let mut send = conn.send.get().await;
772 if buf.len() > 255 {
773 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
774 }
775 send.write_u8(buf.len() as u8)
776 .await
777 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
778 send.write_all(buf.as_slice())
779 .await
780 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
781 send.write_all(body)
782 .await
783 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
784 send.flush()
785 .await
786 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
787 Ok(())
788 }
789
790 async fn send_by_specify_tunnel_with_resp(
791 &self,
792 peer_id: &PeerId,
793 tunnel_id: TunnelId,
794 cmd: CMD,
795 version: u8,
796 body: &[u8],
797 timeout: Duration,
798 ) -> CmdResult<CmdBody> {
799 let conn = self.peer_manager.find_connection(tunnel_id);
800 if conn.is_none() {
801 return Err(cmd_err!(
802 CmdErrorCode::PeerConnectionNotFound,
803 "tunnel_id: {:?}",
804 tunnel_id
805 ));
806 }
807 let conn = conn.unwrap();
808 if let Some(id) = tokio::task::try_id() {
809 if self.state_holder.has_state(id) {
810 return Err(cmd_err!(
811 CmdErrorCode::Failed,
812 "can't send msg with resp in tunnel {:?} msg handle",
813 conn.conn_id
814 ));
815 }
816 }
817 assert_eq!(tunnel_id, conn.conn_id);
818 log::trace!(
819 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}",
820 peer_id,
821 conn.conn_id,
822 cmd,
823 body.len(),
824 hex::encode(body)
825 );
826 let seq = gen_seq();
827 let header = CmdHeader::<LEN, CMD>::new(
828 version,
829 false,
830 Some(seq),
831 cmd,
832 LEN::from_u64(body.len() as u64).unwrap(),
833 );
834 let buf = header
835 .to_vec()
836 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
837 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
838 let waiter = self
839 .resp_waiter
840 .create_timeout_result_future(resp_id, timeout)
841 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
842 {
843 let mut send = conn.send.get().await;
844 if buf.len() > 255 {
845 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
846 }
847 send.write_u8(buf.len() as u8)
848 .await
849 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
850 send.write_all(buf.as_slice())
851 .await
852 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
853 send.write_all(body)
854 .await
855 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
856 send.flush()
857 .await
858 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
859 }
860 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
861 Ok(body)
862 }
863
864 async fn send2_by_specify_tunnel(
865 &self,
866 peer_id: &PeerId,
867 tunnel_id: TunnelId,
868 cmd: CMD,
869 version: u8,
870 body: &[&[u8]],
871 ) -> CmdResult<()> {
872 let conn = self.peer_manager.find_connection(tunnel_id);
873 if conn.is_none() {
874 return Err(cmd_err!(
875 CmdErrorCode::PeerConnectionNotFound,
876 "tunnel_id: {:?}",
877 tunnel_id
878 ));
879 }
880 let conn = conn.unwrap();
881 assert_eq!(tunnel_id, conn.conn_id);
882 let mut len = 0;
883 for b in body.iter() {
884 len += b.len();
885 log::debug!(
886 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
887 peer_id,
888 conn.conn_id,
889 cmd,
890 hex::encode(b)
891 );
892 }
893 log::debug!(
894 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
895 peer_id,
896 conn.conn_id,
897 cmd,
898 len
899 );
900 let header = CmdHeader::<LEN, CMD>::new(
901 version,
902 false,
903 None,
904 cmd,
905 LEN::from_u64(len as u64).unwrap(),
906 );
907 let buf = header
908 .to_vec()
909 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
910 if buf.len() > 255 {
911 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
912 }
913 let mut send = conn.send.get().await;
914 send.write_u8(buf.len() as u8)
915 .await
916 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
917 send.write_all(buf.as_slice())
918 .await
919 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
920 for b in body.iter() {
921 send.write_all(b)
922 .await
923 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
924 }
925 send.flush()
926 .await
927 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
928 Ok(())
929 }
930
931 async fn send2_by_specify_tunnel_with_resp(
932 &self,
933 peer_id: &PeerId,
934 tunnel_id: TunnelId,
935 cmd: CMD,
936 version: u8,
937 body: &[&[u8]],
938 timeout: Duration,
939 ) -> CmdResult<CmdBody> {
940 let conn = self.peer_manager.find_connection(tunnel_id);
941 if conn.is_none() {
942 return Err(cmd_err!(
943 CmdErrorCode::PeerConnectionNotFound,
944 "tunnel_id: {:?}",
945 tunnel_id
946 ));
947 }
948 let conn = conn.unwrap();
949 if let Some(id) = tokio::task::try_id() {
950 if self.state_holder.has_state(id) {
951 return Err(cmd_err!(
952 CmdErrorCode::Failed,
953 "can't send msg with resp in tunnel {:?} msg handle",
954 conn.conn_id
955 ));
956 }
957 }
958 assert_eq!(tunnel_id, conn.conn_id);
959 let mut len = 0;
960 for b in body.iter() {
961 len += b.len();
962 log::debug!(
963 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}",
964 peer_id,
965 conn.conn_id,
966 cmd,
967 hex::encode(b)
968 );
969 }
970 log::debug!(
971 "send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}",
972 peer_id,
973 conn.conn_id,
974 cmd,
975 len
976 );
977 let seq = gen_seq();
978 let header = CmdHeader::<LEN, CMD>::new(
979 version,
980 false,
981 Some(seq),
982 cmd,
983 LEN::from_u64(len as u64).unwrap(),
984 );
985 let buf = header
986 .to_vec()
987 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
988 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
989 let waiter = self
990 .resp_waiter
991 .create_timeout_result_future(resp_id, timeout)
992 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
993 if buf.len() > 255 {
994 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
995 }
996 {
997 let mut send = conn.send.get().await;
998 send.write_u8(buf.len() as u8)
999 .await
1000 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1001 send.write_all(buf.as_slice())
1002 .await
1003 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1004 for b in body.iter() {
1005 send.write_all(b)
1006 .await
1007 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1008 }
1009 send.flush()
1010 .await
1011 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1012 }
1013 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1014 Ok(body)
1015 }
1016
1017 async fn send_cmd_by_specify_tunnel(
1018 &self,
1019 peer_id: &PeerId,
1020 tunnel_id: TunnelId,
1021 cmd: CMD,
1022 version: u8,
1023 mut body: CmdBody,
1024 ) -> CmdResult<()> {
1025 let conn = self.peer_manager.find_connection(tunnel_id);
1026 if conn.is_none() {
1027 return Err(cmd_err!(
1028 CmdErrorCode::PeerConnectionNotFound,
1029 "tunnel_id: {:?}",
1030 tunnel_id
1031 ));
1032 }
1033 let conn = conn.unwrap();
1034 assert_eq!(tunnel_id, conn.conn_id);
1035 log::debug!(
1036 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1037 peer_id,
1038 conn.conn_id,
1039 cmd,
1040 body.len()
1041 );
1042 let header = CmdHeader::<LEN, CMD>::new(
1043 version,
1044 false,
1045 None,
1046 cmd,
1047 LEN::from_u64(body.len()).unwrap(),
1048 );
1049 let buf = header
1050 .to_vec()
1051 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1052 let mut send = conn.send.get().await;
1053 if buf.len() > 255 {
1054 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1055 }
1056 send.write_u8(buf.len() as u8)
1057 .await
1058 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1059 send.write_all(buf.as_slice())
1060 .await
1061 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1062 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1063 .await
1064 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1065 send.flush()
1066 .await
1067 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1068 Ok(())
1069 }
1070
1071 async fn send_cmd_by_specify_tunnel_with_resp(
1072 &self,
1073 peer_id: &PeerId,
1074 tunnel_id: TunnelId,
1075 cmd: CMD,
1076 version: u8,
1077 mut body: CmdBody,
1078 timeout: Duration,
1079 ) -> CmdResult<CmdBody> {
1080 let conn = self.peer_manager.find_connection(tunnel_id);
1081 if conn.is_none() {
1082 return Err(cmd_err!(
1083 CmdErrorCode::PeerConnectionNotFound,
1084 "tunnel_id: {:?}",
1085 tunnel_id
1086 ));
1087 }
1088 let conn = conn.unwrap();
1089 if let Some(id) = tokio::task::try_id() {
1090 if self.state_holder.has_state(id) {
1091 return Err(cmd_err!(
1092 CmdErrorCode::Failed,
1093 "can't send msg with resp in tunnel {:?} msg handle",
1094 conn.conn_id
1095 ));
1096 }
1097 }
1098 assert_eq!(tunnel_id, conn.conn_id);
1099 log::debug!(
1100 "send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}",
1101 peer_id,
1102 conn.conn_id,
1103 cmd,
1104 body.len()
1105 );
1106 let seq = gen_seq();
1107 let header = CmdHeader::<LEN, CMD>::new(
1108 version,
1109 false,
1110 Some(seq),
1111 cmd,
1112 LEN::from_u64(body.len()).unwrap(),
1113 );
1114 let buf = header
1115 .to_vec()
1116 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1117 let resp_id = gen_resp_id(conn.conn_id, cmd, seq);
1118 let waiter = self
1119 .resp_waiter
1120 .create_timeout_result_future(resp_id, timeout)
1121 .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
1122 {
1123 let mut send = conn.send.get().await;
1124 if buf.len() > 255 {
1125 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1126 }
1127 send.write_u8(buf.len() as u8)
1128 .await
1129 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1130 send.write_all(buf.as_slice())
1131 .await
1132 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1133 tokio::io::copy(&mut body, send.deref_mut().deref_mut())
1134 .await
1135 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1136 send.flush()
1137 .await
1138 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1139 }
1140 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
1141 Ok(body)
1142 }
1143
1144 async fn send_by_all_tunnels(
1145 &self,
1146 peer_id: &PeerId,
1147 cmd: CMD,
1148 version: u8,
1149 body: &[u8],
1150 ) -> CmdResult<()> {
1151 let connections = self.peer_manager.find_connections(peer_id);
1152 for conn in connections {
1153 let _ret: CmdResult<()> = async move {
1154 let header = CmdHeader::<LEN, CMD>::new(
1155 version,
1156 false,
1157 None,
1158 cmd,
1159 LEN::from_u64(body.len() as u64).unwrap(),
1160 );
1161 let buf = header
1162 .to_vec()
1163 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1164 let mut send = conn.send.get().await;
1165 send.write_u8(buf.len() as u8)
1166 .await
1167 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1168 send.write_all(buf.as_slice())
1169 .await
1170 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1171 send.write_all(body)
1172 .await
1173 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1174 send.flush()
1175 .await
1176 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1177 Ok(())
1178 }
1179 .await;
1180 }
1181 Ok(())
1182 }
1183
1184 async fn send2_by_all_tunnels(
1185 &self,
1186 peer_id: &PeerId,
1187 cmd: CMD,
1188 version: u8,
1189 body: &[&[u8]],
1190 ) -> CmdResult<()> {
1191 let connections = self.peer_manager.find_connections(peer_id);
1192 let mut len = 0;
1193 for b in body.iter() {
1194 len += b.len();
1195 }
1196 for conn in connections {
1197 let _ret: CmdResult<()> = async move {
1198 let header = CmdHeader::<LEN, CMD>::new(
1199 version,
1200 false,
1201 None,
1202 cmd,
1203 LEN::from_u64(len as u64).unwrap(),
1204 );
1205 let buf = header
1206 .to_vec()
1207 .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
1208 if buf.len() > 255 {
1209 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
1210 }
1211 let mut send = conn.send.get().await;
1212 send.write_u8(buf.len() as u8)
1213 .await
1214 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1215 send.write_all(buf.as_slice())
1216 .await
1217 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1218 for b in body.iter() {
1219 send.write_all(b)
1220 .await
1221 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1222 }
1223 send.flush()
1224 .await
1225 .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
1226 Ok(())
1227 }
1228 .await;
1229 }
1230 Ok(())
1231 }
1232}