1mod node;
2mod classified_node;
3
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::ops::DerefMut;
7use std::sync::Arc;
8use std::time::Duration;
9use async_named_locker::ObjectHolder;
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_pool::WorkerClassification;
13use sfo_split::{RHalf, WHalf};
14use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
15use tokio::task::JoinHandle;
16pub use node::*;
17pub use classified_node::*;
18use crate::{CmdBody, CmdHandler, CmdHeader, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId};
19use crate::client::{CmdSend, SendGuard};
20use crate::cmd::CmdBodyRead;
21use crate::errors::{cmd_err, into_cmd_err, CmdErrorCode, CmdResult};
22
23#[async_trait::async_trait]
24pub trait CmdNode<LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
25 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
26 M: CmdTunnelMeta,
27 S: CmdSend<M>,
28 G: SendGuard<M, S>> {
29 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
30 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
31 async fn send_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody>;
32 async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
33 async fn send2_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody>;
34 async fn send_cmd(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
35 async fn send_cmd_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody>;
36 async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
37 async fn send_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody>;
38 async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
39 async fn send2_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody>;
40 async fn send_cmd_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
41 async fn send_cmd_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody>;
42 async fn clear_all_tunnel(&self);
43 async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<G>;
44}
45
46#[async_trait::async_trait]
47pub trait ClassifiedCmdNode<LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send +'static + FromPrimitive + ToPrimitive,
48 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send +'static + Eq + Hash,
49 C: WorkerClassification,
50 M: CmdTunnelMeta,
51 S: CmdSend<M>,
52 G: SendGuard<M, S>>: CmdNode<LEN, CMD, M, S, G> {
53 async fn send_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
54 async fn send_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody>;
55 async fn send2_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
56 async fn send2_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody>;
57 async fn send_cmd_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
58 async fn send_cmd_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody>;
59 async fn send_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
60 async fn send_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody>;
61 async fn send2_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
62 async fn send2_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody>;
63 async fn send_cmd_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
64 async fn send_cmd_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody>;
65 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
66 async fn find_tunnel_id_by_peer_classified(&self, peer_id: &PeerId,classification: C) -> CmdResult<TunnelId>;
67 async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
68 async fn get_send_by_peer_classified(&self, peer_id: &PeerId, classification: C) -> CmdResult<G>;
69}
70
71
72pub(crate) fn create_recv_handle<M: CmdTunnelMeta,
73 R: CmdTunnelRead<M>,
74 W: CmdTunnelWrite<M>,
75 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
76 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
77>(mut reader: RHalf<R, W>, write: ObjectHolder<WHalf<R, W>>, tunnel_id: TunnelId, cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>) -> JoinHandle<CmdResult<()>> {
78 let recv_handle = tokio::spawn(async move {
79 let ret: CmdResult<()> = async move {
80 let remote_id = reader.get_remote_peer_id();
81 loop {
82 log::trace!("tunnel {:?} enter recv proc", tunnel_id);
83 let header_len = reader.read_u8().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
84 log::trace!("tunnel {:?} recv cmd len {}", tunnel_id, header_len);
85 let mut header = vec![0u8; header_len as usize];
86 let n = reader.read_exact(&mut header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
87 if n == 0 {
88 break;
89 }
90 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
91 log::trace!("tunnel {:?} recv cmd {:?} from {} len {}", tunnel_id, header.cmd_code(), remote_id.to_base36(), header.pkg_len().to_u64().unwrap());
92 let body_len = header.pkg_len().to_u64().unwrap();
93 let cmd_read = CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
94 let waiter = cmd_read.get_waiter();
95 let future = waiter.create_result_future().map_err(into_cmd_err!(CmdErrorCode::Failed))?;
96 {
97 let version = header.version();
98 let seq = header.seq();
99 let cmd_code = header.cmd_code();
100 let body_read = cmd_read;
101 match cmd_handler.handle(remote_id.clone(), tunnel_id, header, CmdBody::from_reader(BufReader::new(body_read), body_len)).await {
102 Ok(Some(mut body)) => {
103 let mut write = write.get().await;
104 let header = CmdHeader::<LEN, CMD>::new(version, true, seq, cmd_code, LEN::from_u64(body.len()).unwrap());
105 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
106 if buf.len() > 255 {
107 return Err(cmd_err!(CmdErrorCode::RawCodecError, "header too long"));
108 }
109 write.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
110 write.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
111 tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
112 write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
113 }
114 Err(e) => {
115 log::error!("handle cmd error: {:?}", e);
116 }
117 _ => {}
118 }
119 };
120 reader = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
121 }
123 Ok(())
124 }.await;
125 ret
126 });
127 recv_handle
128}