sfo_cmd_server/node/
mod.rs

1mod node;
2mod classified_node;
3
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::ops::DerefMut;
7use std::sync::Arc;
8use std::time::Duration;
9use async_named_locker::ObjectHolder;
10use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
11use num::{FromPrimitive, ToPrimitive};
12use sfo_pool::WorkerClassification;
13use sfo_split::{RHalf, WHalf};
14use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
15use tokio::task::JoinHandle;
16pub use node::*;
17pub use classified_node::*;
18use crate::{CmdBody, CmdHandler, CmdHeader, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId};
19use crate::client::{CmdSend, SendGuard};
20use crate::cmd::CmdBodyRead;
21use crate::errors::{cmd_err, into_cmd_err, CmdErrorCode, CmdResult};
22
23#[async_trait::async_trait]
24pub trait CmdNode<LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
25    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
26    M: CmdTunnelMeta,
27    S: CmdSend<M>,
28    G: SendGuard<M, S>> {
29    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
30    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
31    async fn send_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody>;
32    async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
33    async fn send2_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody>;
34    async fn send_cmd(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
35    async fn send_cmd_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody>;
36    async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
37    async fn send_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody>;
38    async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
39    async fn send2_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody>;
40    async fn send_cmd_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
41    async fn send_cmd_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody>;
42    async fn clear_all_tunnel(&self);
43    async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<G>;
44}
45
46#[async_trait::async_trait]
47pub trait ClassifiedCmdNode<LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send +'static + FromPrimitive + ToPrimitive,
48    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send +'static + Eq + Hash,
49    C: WorkerClassification,
50    M: CmdTunnelMeta,
51    S: CmdSend<M>,
52    G: SendGuard<M, S>>: CmdNode<LEN, CMD, M, S, G> {
53    async fn send_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
54    async fn send_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody>;
55    async fn send2_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
56    async fn send2_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody>;
57    async fn send_cmd_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
58    async fn send_cmd_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody>;
59    async fn send_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
60    async fn send_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody>;
61    async fn send2_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
62    async fn send2_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody>;
63    async fn send_cmd_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
64    async fn send_cmd_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody>;
65    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
66    async fn find_tunnel_id_by_peer_classified(&self, peer_id: &PeerId,classification: C) -> CmdResult<TunnelId>;
67    async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
68    async fn get_send_by_peer_classified(&self, peer_id: &PeerId, classification: C) -> CmdResult<G>;
69}
70
71
72pub(crate) fn create_recv_handle<M: CmdTunnelMeta,
73    R: CmdTunnelRead<M>,
74    W: CmdTunnelWrite<M>,
75    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
76    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
77>(mut reader: RHalf<R, W>, write: ObjectHolder<WHalf<R, W>>, tunnel_id: TunnelId, cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>) -> JoinHandle<CmdResult<()>> {
78    let recv_handle = tokio::spawn(async move {
79        let ret: CmdResult<()> = async move {
80            let remote_id = reader.get_remote_peer_id();
81            loop {
82                log::trace!("tunnel {:?} enter recv proc", tunnel_id);
83                let header_len = reader.read_u8().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
84                log::trace!("tunnel {:?} recv cmd len {}", tunnel_id, header_len);
85                let mut header = vec![0u8; header_len as usize];
86                let n = reader.read_exact(&mut header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
87                if n == 0 {
88                    break;
89                }
90                let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
91                log::trace!("tunnel {:?} recv cmd {:?} from {} len {}", tunnel_id, header.cmd_code(), remote_id.to_base36(), header.pkg_len().to_u64().unwrap());
92                let body_len = header.pkg_len().to_u64().unwrap();
93                let cmd_read = CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
94                let waiter = cmd_read.get_waiter();
95                let future = waiter.create_result_future().map_err(into_cmd_err!(CmdErrorCode::Failed))?;
96                {
97                    let version = header.version();
98                    let seq = header.seq();
99                    let cmd_code = header.cmd_code();
100                    let body_read = cmd_read;
101                    match cmd_handler.handle(remote_id.clone(), tunnel_id, header, CmdBody::from_reader(BufReader::new(body_read), body_len)).await {
102                        Ok(Some(mut body)) => {
103                            let mut write = write.get().await;
104                            let header = CmdHeader::<LEN, CMD>::new(version, true, seq, cmd_code, LEN::from_u64(body.len()).unwrap());
105                            let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
106                            if buf.len() > 255 {
107                                return Err(cmd_err!(CmdErrorCode::RawCodecError, "header too long"));
108                            }
109                            write.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
110                            write.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
111                            tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
112                            write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
113                        }
114                        Err(e) => {
115                            log::error!("handle cmd error: {:?}", e);
116                        }
117                        _ => {}
118                    }
119                };
120                reader = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
121                // }
122            }
123            Ok(())
124        }.await;
125        ret
126    });
127    recv_handle
128}