sfo_cmd_server/node/
mod.rs

1mod node;
2mod classified_node;
3
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::sync::Arc;
7use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes, RawFrom};
8use num::{FromPrimitive, ToPrimitive};
9use sfo_pool::WorkerClassification;
10use sfo_split::RHalf;
11use tokio::io::AsyncReadExt;
12use tokio::task::JoinHandle;
13pub use node::*;
14pub use classified_node::*;
15use crate::{CmdHandler, CmdHeader, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId};
16use crate::cmd::CmdBodyReadImpl;
17use crate::errors::{into_cmd_err, CmdErrorCode, CmdResult};
18
19#[async_trait::async_trait]
20pub trait CmdNode<LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
21    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash> {
22    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
23    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
24    async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
25    async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
26    async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
27    async fn clear_all_tunnel(&self);
28}
29
30#[async_trait::async_trait]
31pub trait ClassifiedCmdNode<LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send +'static + FromPrimitive + ToPrimitive,
32    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send +'static + Eq + Hash, C: WorkerClassification>: CmdNode<LEN, CMD> {
33    async fn send_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
34    async fn send2_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
35    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
36}
37
38
39pub(crate) fn create_recv_handle<R: CmdTunnelRead,
40    W: CmdTunnelWrite,
41    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
42    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
43>(mut reader: RHalf<R, W>, tunnel_id: TunnelId, cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>) -> JoinHandle<CmdResult<()>> {
44    let recv_handle = tokio::spawn(async move {
45        let ret: CmdResult<()> = async move {
46            let remote_id = reader.get_remote_peer_id();
47            loop {
48                let mut header = vec![0u8; CmdHeader::<LEN, CMD>::raw_bytes().unwrap()];
49                let n = reader.read_exact(&mut header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
50                if n == 0 {
51                    break;
52                }
53                let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
54                log::trace!("recv cmd {:?} from {} len {}", header.cmd_code(), remote_id.to_base58(), header.pkg_len().to_u64().unwrap());
55                let cmd_read = Box::new(CmdBodyReadImpl::new(reader, header.pkg_len().to_u64().unwrap() as usize));
56                let waiter = cmd_read.get_waiter();
57                let future = waiter.create_result_future();
58                {
59                    let body_read = cmd_read;
60                    if let Err(e) = cmd_handler.handle(remote_id.clone(), tunnel_id, header, body_read).await {
61                        log::error!("handle cmd error: {:?}", e);
62                    }
63                };
64                reader = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
65                // }
66            }
67            Ok(())
68        }.await;
69        ret
70    });
71    recv_handle
72}