Skip to main content

sfo_cmd_server/node/
mod.rs

1mod classified_node;
2mod node;
3
4use crate::client::{CmdSend, SendGuard};
5use crate::cmd::CmdBodyRead;
6use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
7use crate::{
8    CmdBody, CmdHandler, CmdHeader, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId,
9};
10use async_named_locker::ObjectHolder;
11use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
12pub use classified_node::*;
13pub use node::*;
14use num::{FromPrimitive, ToPrimitive};
15use sfo_pool::WorkerClassification;
16use sfo_split::{RHalf, WHalf};
17use std::fmt::Debug;
18use std::hash::Hash;
19use std::ops::DerefMut;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
23use tokio::task::JoinHandle;
24
25#[async_trait::async_trait]
26pub trait CmdNode<
27    LEN: RawEncode
28        + for<'a> RawDecode<'a>
29        + Copy
30        + RawFixedBytes
31        + Sync
32        + Send
33        + 'static
34        + FromPrimitive
35        + ToPrimitive,
36    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
37    M: CmdTunnelMeta,
38    S: CmdSend<M>,
39    G: SendGuard<M, S>,
40>
41{
42    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
43    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
44    async fn send_with_resp(
45        &self,
46        peer_id: &PeerId,
47        cmd: CMD,
48        version: u8,
49        body: &[u8],
50        timeout: Duration,
51    ) -> CmdResult<CmdBody>;
52    async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]])
53    -> CmdResult<()>;
54    async fn send2_with_resp(
55        &self,
56        peer_id: &PeerId,
57        cmd: CMD,
58        version: u8,
59        body: &[&[u8]],
60        timeout: Duration,
61    ) -> CmdResult<CmdBody>;
62    async fn send_cmd(
63        &self,
64        peer_id: &PeerId,
65        cmd: CMD,
66        version: u8,
67        body: CmdBody,
68    ) -> CmdResult<()>;
69    async fn send_cmd_with_resp(
70        &self,
71        peer_id: &PeerId,
72        cmd: CMD,
73        version: u8,
74        body: CmdBody,
75        timeout: Duration,
76    ) -> CmdResult<CmdBody>;
77    async fn send_by_specify_tunnel(
78        &self,
79        peer_id: &PeerId,
80        tunnel_id: TunnelId,
81        cmd: CMD,
82        version: u8,
83        body: &[u8],
84    ) -> CmdResult<()>;
85    async fn send_by_specify_tunnel_with_resp(
86        &self,
87        peer_id: &PeerId,
88        tunnel_id: TunnelId,
89        cmd: CMD,
90        version: u8,
91        body: &[u8],
92        timeout: Duration,
93    ) -> CmdResult<CmdBody>;
94    async fn send2_by_specify_tunnel(
95        &self,
96        peer_id: &PeerId,
97        tunnel_id: TunnelId,
98        cmd: CMD,
99        version: u8,
100        body: &[&[u8]],
101    ) -> CmdResult<()>;
102    async fn send2_by_specify_tunnel_with_resp(
103        &self,
104        peer_id: &PeerId,
105        tunnel_id: TunnelId,
106        cmd: CMD,
107        version: u8,
108        body: &[&[u8]],
109        timeout: Duration,
110    ) -> CmdResult<CmdBody>;
111    async fn send_cmd_by_specify_tunnel(
112        &self,
113        peer_id: &PeerId,
114        tunnel_id: TunnelId,
115        cmd: CMD,
116        version: u8,
117        body: CmdBody,
118    ) -> CmdResult<()>;
119    async fn send_cmd_by_specify_tunnel_with_resp(
120        &self,
121        peer_id: &PeerId,
122        tunnel_id: TunnelId,
123        cmd: CMD,
124        version: u8,
125        body: CmdBody,
126        timeout: Duration,
127    ) -> CmdResult<CmdBody>;
128    async fn clear_all_tunnel(&self);
129    async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<G>;
130}
131
132#[async_trait::async_trait]
133pub trait ClassifiedCmdNode<
134    LEN: RawEncode
135        + for<'a> RawDecode<'a>
136        + Copy
137        + RawFixedBytes
138        + Sync
139        + Send
140        + 'static
141        + FromPrimitive
142        + ToPrimitive,
143    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
144    C: WorkerClassification,
145    M: CmdTunnelMeta,
146    S: CmdSend<M>,
147    G: SendGuard<M, S>,
148>: CmdNode<LEN, CMD, M, S, G>
149{
150    async fn send_by_classified_tunnel(
151        &self,
152        classification: C,
153        cmd: CMD,
154        version: u8,
155        body: &[u8],
156    ) -> CmdResult<()>;
157    async fn send_by_classified_tunnel_with_resp(
158        &self,
159        classification: C,
160        cmd: CMD,
161        version: u8,
162        body: &[u8],
163        timeout: Duration,
164    ) -> CmdResult<CmdBody>;
165    async fn send2_by_classified_tunnel(
166        &self,
167        classification: C,
168        cmd: CMD,
169        version: u8,
170        body: &[&[u8]],
171    ) -> CmdResult<()>;
172    async fn send2_by_classified_tunnel_with_resp(
173        &self,
174        classification: C,
175        cmd: CMD,
176        version: u8,
177        body: &[&[u8]],
178        timeout: Duration,
179    ) -> CmdResult<CmdBody>;
180    async fn send_cmd_by_classified_tunnel(
181        &self,
182        classification: C,
183        cmd: CMD,
184        version: u8,
185        body: CmdBody,
186    ) -> CmdResult<()>;
187    async fn send_cmd_by_classified_tunnel_with_resp(
188        &self,
189        classification: C,
190        cmd: CMD,
191        version: u8,
192        body: CmdBody,
193        timeout: Duration,
194    ) -> CmdResult<CmdBody>;
195    async fn send_by_peer_classified_tunnel(
196        &self,
197        peer_id: &PeerId,
198        classification: C,
199        cmd: CMD,
200        version: u8,
201        body: &[u8],
202    ) -> CmdResult<()>;
203    async fn send_by_peer_classified_tunnel_with_resp(
204        &self,
205        peer_id: &PeerId,
206        classification: C,
207        cmd: CMD,
208        version: u8,
209        body: &[u8],
210        timeout: Duration,
211    ) -> CmdResult<CmdBody>;
212    async fn send2_by_peer_classified_tunnel(
213        &self,
214        peer_id: &PeerId,
215        classification: C,
216        cmd: CMD,
217        version: u8,
218        body: &[&[u8]],
219    ) -> CmdResult<()>;
220    async fn send2_by_peer_classified_tunnel_with_resp(
221        &self,
222        peer_id: &PeerId,
223        classification: C,
224        cmd: CMD,
225        version: u8,
226        body: &[&[u8]],
227        timeout: Duration,
228    ) -> CmdResult<CmdBody>;
229    async fn send_cmd_by_peer_classified_tunnel(
230        &self,
231        peer_id: &PeerId,
232        classification: C,
233        cmd: CMD,
234        version: u8,
235        body: CmdBody,
236    ) -> CmdResult<()>;
237    async fn send_cmd_by_peer_classified_tunnel_with_resp(
238        &self,
239        peer_id: &PeerId,
240        classification: C,
241        cmd: CMD,
242        version: u8,
243        body: CmdBody,
244        timeout: Duration,
245    ) -> CmdResult<CmdBody>;
246    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
247    async fn find_tunnel_id_by_peer_classified(
248        &self,
249        peer_id: &PeerId,
250        classification: C,
251    ) -> CmdResult<TunnelId>;
252    async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
253    async fn get_send_by_peer_classified(
254        &self,
255        peer_id: &PeerId,
256        classification: C,
257    ) -> CmdResult<G>;
258}
259
260pub(crate) fn create_recv_handle<
261    M: CmdTunnelMeta,
262    R: CmdTunnelRead<M>,
263    W: CmdTunnelWrite<M>,
264    LEN: RawEncode
265        + for<'a> RawDecode<'a>
266        + Copy
267        + Send
268        + Sync
269        + 'static
270        + FromPrimitive
271        + ToPrimitive
272        + RawFixedBytes,
273    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
274>(
275    mut reader: RHalf<R, W>,
276    write: ObjectHolder<WHalf<R, W>>,
277    tunnel_id: TunnelId,
278    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
279) -> JoinHandle<CmdResult<()>> {
280    let recv_handle = tokio::spawn(async move {
281        let ret: CmdResult<()> = async move {
282            let remote_id = reader.get_remote_peer_id();
283            loop {
284                log::trace!("tunnel {:?} enter recv proc", tunnel_id);
285                let header_len = reader
286                    .read_u8()
287                    .await
288                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
289                log::trace!("tunnel {:?} recv cmd len {}", tunnel_id, header_len);
290                let mut header = vec![0u8; header_len as usize];
291                let n = reader
292                    .read_exact(&mut header)
293                    .await
294                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
295                if n == 0 {
296                    break;
297                }
298                let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
299                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
300                log::trace!(
301                    "tunnel {:?} recv cmd {:?} from {} len {}",
302                    tunnel_id,
303                    header.cmd_code(),
304                    remote_id.to_base36(),
305                    header.pkg_len().to_u64().unwrap()
306                );
307                let body_len = header.pkg_len().to_u64().unwrap();
308                let cmd_read =
309                    CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
310                let waiter = cmd_read.get_waiter();
311                let future = waiter
312                    .create_result_future()
313                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
314                {
315                    let version = header.version();
316                    let seq = header.seq();
317                    let cmd_code = header.cmd_code();
318                    let body_read = cmd_read;
319                    match cmd_handler
320                        .handle(
321                            remote_id.clone(),
322                            tunnel_id,
323                            header,
324                            CmdBody::from_reader(BufReader::new(body_read), body_len),
325                        )
326                        .await
327                    {
328                        Ok(Some(mut body)) => {
329                            let mut write = write.get().await;
330                            let header = CmdHeader::<LEN, CMD>::new(
331                                version,
332                                true,
333                                seq,
334                                cmd_code,
335                                LEN::from_u64(body.len()).unwrap(),
336                            );
337                            let buf = header
338                                .to_vec()
339                                .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
340                            if buf.len() > 255 {
341                                return Err(cmd_err!(
342                                    CmdErrorCode::RawCodecError,
343                                    "header too long"
344                                ));
345                            }
346                            write
347                                .write_u8(buf.len() as u8)
348                                .await
349                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
350                            write
351                                .write_all(buf.as_slice())
352                                .await
353                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
354                            tokio::io::copy(&mut body, write.deref_mut().deref_mut())
355                                .await
356                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
357                            write
358                                .flush()
359                                .await
360                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
361                        }
362                        Err(e) => {
363                            log::error!("handle cmd error: {:?}", e);
364                        }
365                        _ => {}
366                    }
367                };
368                reader = future
369                    .await
370                    .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
371                // }
372            }
373            Ok(())
374        }
375        .await;
376        ret
377    });
378    recv_handle
379}