sfo_cmd_server/node/
mod.rs1mod node;
2mod classified_node;
3
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::sync::Arc;
7use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes, RawFrom};
8use num::{FromPrimitive, ToPrimitive};
9use sfo_pool::WorkerClassification;
10use sfo_split::RHalf;
11use tokio::io::AsyncReadExt;
12use tokio::task::JoinHandle;
13pub use node::*;
14pub use classified_node::*;
15use crate::{CmdHandler, CmdHeader, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId};
16use crate::cmd::CmdBodyReadImpl;
17use crate::errors::{into_cmd_err, CmdErrorCode, CmdResult};
18
19#[async_trait::async_trait]
20pub trait CmdNode<LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
21 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash> {
22 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
23 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
24 async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
25 async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
26 async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
27 async fn clear_all_tunnel(&self);
28}
29
30#[async_trait::async_trait]
31pub trait ClassifiedCmdNode<LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send +'static + FromPrimitive + ToPrimitive,
32 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send +'static + Eq + Hash, C: WorkerClassification>: CmdNode<LEN, CMD> {
33 async fn send_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
34 async fn send2_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
35 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
36}
37
38
39pub(crate) fn create_recv_handle<R: CmdTunnelRead,
40 W: CmdTunnelWrite,
41 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
42 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
43>(mut reader: RHalf<R, W>, tunnel_id: TunnelId, cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>) -> JoinHandle<CmdResult<()>> {
44 let recv_handle = tokio::spawn(async move {
45 let ret: CmdResult<()> = async move {
46 let remote_id = reader.get_remote_peer_id();
47 loop {
48 let mut header = vec![0u8; CmdHeader::<LEN, CMD>::raw_bytes().unwrap()];
49 let n = reader.read_exact(&mut header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
50 if n == 0 {
51 break;
52 }
53 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
54 log::trace!("recv cmd {:?} from {} len {}", header.cmd_code(), remote_id.to_base58(), header.pkg_len().to_u64().unwrap());
55 let cmd_read = Box::new(CmdBodyReadImpl::new(reader, header.pkg_len().to_u64().unwrap() as usize));
56 let waiter = cmd_read.get_waiter();
57 let future = waiter.create_result_future();
58 {
59 let body_read = cmd_read;
60 if let Err(e) = cmd_handler.handle(remote_id.clone(), tunnel_id, header, body_read).await {
61 log::error!("handle cmd error: {:?}", e);
62 }
63 };
64 reader = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
65 }
67 Ok(())
68 }.await;
69 ret
70 });
71 recv_handle
72}