Skip to main content

sfo_cmd_server/node/
mod.rs

1mod classified_node;
2mod node;
3
4use crate::client::{CmdSend, SendGuard};
5use crate::cmd::CmdBodyRead;
6use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
7use crate::{
8    CmdBody, CmdHandler, CmdHeader, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId,
9};
10use async_named_locker::ObjectHolder;
11use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
12pub use classified_node::*;
13pub use node::*;
14use num::{FromPrimitive, ToPrimitive};
15use sfo_pool::WorkerClassification;
16use sfo_split::{RHalf, WHalf};
17use std::fmt::Debug;
18use std::hash::Hash;
19use std::ops::DerefMut;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
23use tokio::task::JoinHandle;
24
25#[async_trait::async_trait]
26pub trait CmdNode<
27    LEN: RawEncode
28        + for<'a> RawDecode<'a>
29        + Copy
30        + RawFixedBytes
31        + Sync
32        + Send
33        + 'static
34        + FromPrimitive
35        + ToPrimitive,
36    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
37    M: CmdTunnelMeta,
38    S: CmdSend<M>,
39    G: SendGuard<M, S>,
40>
41{
42    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
43    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
44    async fn send_with_resp(
45        &self,
46        peer_id: &PeerId,
47        cmd: CMD,
48        version: u8,
49        body: &[u8],
50        timeout: Duration,
51    ) -> CmdResult<CmdBody>;
52    async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]])
53    -> CmdResult<()>;
54    async fn send2_with_resp(
55        &self,
56        peer_id: &PeerId,
57        cmd: CMD,
58        version: u8,
59        body: &[&[u8]],
60        timeout: Duration,
61    ) -> CmdResult<CmdBody>;
62    async fn send_cmd(
63        &self,
64        peer_id: &PeerId,
65        cmd: CMD,
66        version: u8,
67        body: CmdBody,
68    ) -> CmdResult<()>;
69    async fn send_cmd_with_resp(
70        &self,
71        peer_id: &PeerId,
72        cmd: CMD,
73        version: u8,
74        body: CmdBody,
75        timeout: Duration,
76    ) -> CmdResult<CmdBody>;
77    async fn send_by_specify_tunnel(
78        &self,
79        peer_id: &PeerId,
80        tunnel_id: TunnelId,
81        cmd: CMD,
82        version: u8,
83        body: &[u8],
84    ) -> CmdResult<()>;
85    async fn send_by_specify_tunnel_with_resp(
86        &self,
87        peer_id: &PeerId,
88        tunnel_id: TunnelId,
89        cmd: CMD,
90        version: u8,
91        body: &[u8],
92        timeout: Duration,
93    ) -> CmdResult<CmdBody>;
94    async fn send2_by_specify_tunnel(
95        &self,
96        peer_id: &PeerId,
97        tunnel_id: TunnelId,
98        cmd: CMD,
99        version: u8,
100        body: &[&[u8]],
101    ) -> CmdResult<()>;
102    async fn send2_by_specify_tunnel_with_resp(
103        &self,
104        peer_id: &PeerId,
105        tunnel_id: TunnelId,
106        cmd: CMD,
107        version: u8,
108        body: &[&[u8]],
109        timeout: Duration,
110    ) -> CmdResult<CmdBody>;
111    async fn send_cmd_by_specify_tunnel(
112        &self,
113        peer_id: &PeerId,
114        tunnel_id: TunnelId,
115        cmd: CMD,
116        version: u8,
117        body: CmdBody,
118    ) -> CmdResult<()>;
119    async fn send_cmd_by_specify_tunnel_with_resp(
120        &self,
121        peer_id: &PeerId,
122        tunnel_id: TunnelId,
123        cmd: CMD,
124        version: u8,
125        body: CmdBody,
126        timeout: Duration,
127    ) -> CmdResult<CmdBody>;
128    async fn clear_all_tunnel(&self);
129    async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<G>;
130}
131
132#[async_trait::async_trait]
133pub trait ClassifiedCmdNode<
134    LEN: RawEncode
135        + for<'a> RawDecode<'a>
136        + Copy
137        + RawFixedBytes
138        + Sync
139        + Send
140        + 'static
141        + FromPrimitive
142        + ToPrimitive,
143    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
144    C: WorkerClassification,
145    M: CmdTunnelMeta,
146    S: CmdSend<M>,
147    G: SendGuard<M, S>,
148>: CmdNode<LEN, CMD, M, S, G>
149{
150    async fn send_by_classified_tunnel(
151        &self,
152        classification: C,
153        cmd: CMD,
154        version: u8,
155        body: &[u8],
156    ) -> CmdResult<()>;
157    async fn send_by_classified_tunnel_with_resp(
158        &self,
159        classification: C,
160        cmd: CMD,
161        version: u8,
162        body: &[u8],
163        timeout: Duration,
164    ) -> CmdResult<CmdBody>;
165    async fn send2_by_classified_tunnel(
166        &self,
167        classification: C,
168        cmd: CMD,
169        version: u8,
170        body: &[&[u8]],
171    ) -> CmdResult<()>;
172    async fn send2_by_classified_tunnel_with_resp(
173        &self,
174        classification: C,
175        cmd: CMD,
176        version: u8,
177        body: &[&[u8]],
178        timeout: Duration,
179    ) -> CmdResult<CmdBody>;
180    async fn send_cmd_by_classified_tunnel(
181        &self,
182        classification: C,
183        cmd: CMD,
184        version: u8,
185        body: CmdBody,
186    ) -> CmdResult<()>;
187    async fn send_cmd_by_classified_tunnel_with_resp(
188        &self,
189        classification: C,
190        cmd: CMD,
191        version: u8,
192        body: CmdBody,
193        timeout: Duration,
194    ) -> CmdResult<CmdBody>;
195    async fn send_by_peer_classified_tunnel(
196        &self,
197        peer_id: &PeerId,
198        classification: C,
199        cmd: CMD,
200        version: u8,
201        body: &[u8],
202    ) -> CmdResult<()>;
203    async fn send_by_peer_classified_tunnel_with_resp(
204        &self,
205        peer_id: &PeerId,
206        classification: C,
207        cmd: CMD,
208        version: u8,
209        body: &[u8],
210        timeout: Duration,
211    ) -> CmdResult<CmdBody>;
212    async fn send2_by_peer_classified_tunnel(
213        &self,
214        peer_id: &PeerId,
215        classification: C,
216        cmd: CMD,
217        version: u8,
218        body: &[&[u8]],
219    ) -> CmdResult<()>;
220    async fn send2_by_peer_classified_tunnel_with_resp(
221        &self,
222        peer_id: &PeerId,
223        classification: C,
224        cmd: CMD,
225        version: u8,
226        body: &[&[u8]],
227        timeout: Duration,
228    ) -> CmdResult<CmdBody>;
229    async fn send_cmd_by_peer_classified_tunnel(
230        &self,
231        peer_id: &PeerId,
232        classification: C,
233        cmd: CMD,
234        version: u8,
235        body: CmdBody,
236    ) -> CmdResult<()>;
237    async fn send_cmd_by_peer_classified_tunnel_with_resp(
238        &self,
239        peer_id: &PeerId,
240        classification: C,
241        cmd: CMD,
242        version: u8,
243        body: CmdBody,
244        timeout: Duration,
245    ) -> CmdResult<CmdBody>;
246    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
247    async fn find_tunnel_id_by_peer_classified(
248        &self,
249        peer_id: &PeerId,
250        classification: C,
251    ) -> CmdResult<TunnelId>;
252    async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
253    async fn get_send_by_peer_classified(
254        &self,
255        peer_id: &PeerId,
256        classification: C,
257    ) -> CmdResult<G>;
258}
259
260pub(crate) fn create_recv_handle<
261    M: CmdTunnelMeta,
262    R: CmdTunnelRead<M>,
263    W: CmdTunnelWrite<M>,
264    LEN: RawEncode
265        + for<'a> RawDecode<'a>
266        + Copy
267        + Send
268        + Sync
269        + 'static
270        + FromPrimitive
271        + ToPrimitive
272        + RawFixedBytes,
273    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
274>(
275    mut reader: RHalf<R, W>,
276    write: ObjectHolder<WHalf<R, W>>,
277    tunnel_id: TunnelId,
278    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
279) -> JoinHandle<CmdResult<()>> {
280    let recv_handle = tokio::spawn(async move {
281        let ret: CmdResult<()> = async move {
282            let local_id = reader.get_local_peer_id();
283            let remote_id = reader.get_remote_peer_id();
284            loop {
285                log::trace!("tunnel {:?} enter recv proc", tunnel_id);
286                let header_len = reader
287                    .read_u8()
288                    .await
289                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
290                log::trace!("tunnel {:?} recv cmd len {}", tunnel_id, header_len);
291                let mut header = vec![0u8; header_len as usize];
292                let n = reader
293                    .read_exact(&mut header)
294                    .await
295                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
296                if n == 0 {
297                    break;
298                }
299                let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
300                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
301                log::trace!(
302                    "tunnel {:?} recv cmd {:?} from {} len {}",
303                    tunnel_id,
304                    header.cmd_code(),
305                    remote_id.to_base36(),
306                    header.pkg_len().to_u64().unwrap()
307                );
308                let body_len = header.pkg_len().to_u64().unwrap();
309                let cmd_read =
310                    CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
311                let waiter = cmd_read.get_waiter();
312                let future = waiter
313                    .create_result_future()
314                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
315                {
316                    let version = header.version();
317                    let seq = header.seq();
318                    let cmd_code = header.cmd_code();
319                    let body_read = cmd_read;
320                    match cmd_handler
321                        .handle(
322                            local_id.clone(),
323                            remote_id.clone(),
324                            tunnel_id,
325                            header,
326                            CmdBody::from_reader(BufReader::new(body_read), body_len),
327                        )
328                        .await
329                    {
330                        Ok(Some(mut body)) => {
331                            let mut write = write.get().await;
332                            let header = CmdHeader::<LEN, CMD>::new(
333                                version,
334                                true,
335                                seq,
336                                cmd_code,
337                                LEN::from_u64(body.len()).unwrap(),
338                            );
339                            let buf = header
340                                .to_vec()
341                                .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
342                            if buf.len() > 255 {
343                                return Err(cmd_err!(
344                                    CmdErrorCode::RawCodecError,
345                                    "header too long"
346                                ));
347                            }
348                            write
349                                .write_u8(buf.len() as u8)
350                                .await
351                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
352                            write
353                                .write_all(buf.as_slice())
354                                .await
355                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
356                            tokio::io::copy(&mut body, write.deref_mut().deref_mut())
357                                .await
358                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
359                            write
360                                .flush()
361                                .await
362                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
363                        }
364                        Err(e) => {
365                            log::error!("handle cmd error: {:?}", e);
366                        }
367                        _ => {}
368                    }
369                };
370                reader = future
371                    .await
372                    .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
373                // }
374            }
375            Ok(())
376        }
377        .await;
378        ret
379    });
380    recv_handle
381}