Skip to main content

sfo_cmd_server/node/
mod.rs

1mod classified_node;
2mod node;
3
4use crate::client::{CmdSend, SendGuard};
5use crate::cmd::CmdBodyRead;
6use crate::errors::{CmdErrorCode, CmdResult, cmd_err, into_cmd_err};
7use crate::{
8    CmdBody, CmdHandler, CmdHeader, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId, TunnelId,
9};
10use async_named_locker::ObjectHolder;
11use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
12pub use classified_node::*;
13pub use node::*;
14use num::{FromPrimitive, ToPrimitive};
15use sfo_pool::WorkerClassification;
16use sfo_split::{RHalf, WHalf};
17use std::fmt::Debug;
18use std::hash::Hash;
19use std::ops::DerefMut;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
23use tokio::task::JoinHandle;
24
25#[async_trait::async_trait]
26pub trait CmdNode<
27    LEN: RawEncode
28        + for<'a> RawDecode<'a>
29        + Copy
30        + RawFixedBytes
31        + Sync
32        + Send
33        + 'static
34        + FromPrimitive
35        + ToPrimitive,
36    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
37    M: CmdTunnelMeta,
38    S: CmdSend<M>,
39    G: SendGuard<M, S>,
40>
41{
42    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
43    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
44    async fn send_with_resp(
45        &self,
46        peer_id: &PeerId,
47        cmd: CMD,
48        version: u8,
49        body: &[u8],
50        timeout: Duration,
51    ) -> CmdResult<CmdBody>;
52    async fn send_parts(
53        &self,
54        peer_id: &PeerId,
55        cmd: CMD,
56        version: u8,
57        body: &[&[u8]],
58    ) -> CmdResult<()>;
59    async fn send_parts_with_resp(
60        &self,
61        peer_id: &PeerId,
62        cmd: CMD,
63        version: u8,
64        body: &[&[u8]],
65        timeout: Duration,
66    ) -> CmdResult<CmdBody>;
67    #[deprecated(note = "use send_parts instead")]
68    async fn send2(
69        &self,
70        peer_id: &PeerId,
71        cmd: CMD,
72        version: u8,
73        body: &[&[u8]],
74    ) -> CmdResult<()> {
75        self.send_parts(peer_id, cmd, version, body).await
76    }
77    #[deprecated(note = "use send_parts_with_resp instead")]
78    async fn send2_with_resp(
79        &self,
80        peer_id: &PeerId,
81        cmd: CMD,
82        version: u8,
83        body: &[&[u8]],
84        timeout: Duration,
85    ) -> CmdResult<CmdBody> {
86        self.send_parts_with_resp(peer_id, cmd, version, body, timeout)
87            .await
88    }
89    async fn send_cmd(
90        &self,
91        peer_id: &PeerId,
92        cmd: CMD,
93        version: u8,
94        body: CmdBody,
95    ) -> CmdResult<()>;
96    async fn send_cmd_with_resp(
97        &self,
98        peer_id: &PeerId,
99        cmd: CMD,
100        version: u8,
101        body: CmdBody,
102        timeout: Duration,
103    ) -> CmdResult<CmdBody>;
104    async fn send_by_specify_tunnel(
105        &self,
106        peer_id: &PeerId,
107        tunnel_id: TunnelId,
108        cmd: CMD,
109        version: u8,
110        body: &[u8],
111    ) -> CmdResult<()>;
112    async fn send_by_specify_tunnel_with_resp(
113        &self,
114        peer_id: &PeerId,
115        tunnel_id: TunnelId,
116        cmd: CMD,
117        version: u8,
118        body: &[u8],
119        timeout: Duration,
120    ) -> CmdResult<CmdBody>;
121    async fn send_parts_by_specify_tunnel(
122        &self,
123        peer_id: &PeerId,
124        tunnel_id: TunnelId,
125        cmd: CMD,
126        version: u8,
127        body: &[&[u8]],
128    ) -> CmdResult<()>;
129    async fn send_parts_by_specify_tunnel_with_resp(
130        &self,
131        peer_id: &PeerId,
132        tunnel_id: TunnelId,
133        cmd: CMD,
134        version: u8,
135        body: &[&[u8]],
136        timeout: Duration,
137    ) -> CmdResult<CmdBody>;
138    #[deprecated(note = "use send_parts_by_specify_tunnel instead")]
139    async fn send2_by_specify_tunnel(
140        &self,
141        peer_id: &PeerId,
142        tunnel_id: TunnelId,
143        cmd: CMD,
144        version: u8,
145        body: &[&[u8]],
146    ) -> CmdResult<()> {
147        self.send_parts_by_specify_tunnel(peer_id, tunnel_id, cmd, version, body)
148            .await
149    }
150    #[deprecated(note = "use send_parts_by_specify_tunnel_with_resp instead")]
151    async fn send2_by_specify_tunnel_with_resp(
152        &self,
153        peer_id: &PeerId,
154        tunnel_id: TunnelId,
155        cmd: CMD,
156        version: u8,
157        body: &[&[u8]],
158        timeout: Duration,
159    ) -> CmdResult<CmdBody> {
160        self.send_parts_by_specify_tunnel_with_resp(peer_id, tunnel_id, cmd, version, body, timeout)
161            .await
162    }
163    async fn send_cmd_by_specify_tunnel(
164        &self,
165        peer_id: &PeerId,
166        tunnel_id: TunnelId,
167        cmd: CMD,
168        version: u8,
169        body: CmdBody,
170    ) -> CmdResult<()>;
171    async fn send_cmd_by_specify_tunnel_with_resp(
172        &self,
173        peer_id: &PeerId,
174        tunnel_id: TunnelId,
175        cmd: CMD,
176        version: u8,
177        body: CmdBody,
178        timeout: Duration,
179    ) -> CmdResult<CmdBody>;
180    async fn clear_all_tunnel(&self);
181    async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<G>;
182}
183
184#[async_trait::async_trait]
185pub trait ClassifiedCmdNode<
186    LEN: RawEncode
187        + for<'a> RawDecode<'a>
188        + Copy
189        + RawFixedBytes
190        + Sync
191        + Send
192        + 'static
193        + FromPrimitive
194        + ToPrimitive,
195    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
196    C: WorkerClassification,
197    M: CmdTunnelMeta,
198    S: CmdSend<M>,
199    G: SendGuard<M, S>,
200>: CmdNode<LEN, CMD, M, S, G>
201{
202    async fn send_by_classified_tunnel(
203        &self,
204        classification: C,
205        cmd: CMD,
206        version: u8,
207        body: &[u8],
208    ) -> CmdResult<()>;
209    async fn send_by_classified_tunnel_with_resp(
210        &self,
211        classification: C,
212        cmd: CMD,
213        version: u8,
214        body: &[u8],
215        timeout: Duration,
216    ) -> CmdResult<CmdBody>;
217    async fn send_parts_by_classified_tunnel(
218        &self,
219        classification: C,
220        cmd: CMD,
221        version: u8,
222        body: &[&[u8]],
223    ) -> CmdResult<()>;
224    async fn send_parts_by_classified_tunnel_with_resp(
225        &self,
226        classification: C,
227        cmd: CMD,
228        version: u8,
229        body: &[&[u8]],
230        timeout: Duration,
231    ) -> CmdResult<CmdBody>;
232    #[deprecated(note = "use send_parts_by_classified_tunnel instead")]
233    async fn send2_by_classified_tunnel(
234        &self,
235        classification: C,
236        cmd: CMD,
237        version: u8,
238        body: &[&[u8]],
239    ) -> CmdResult<()> {
240        self.send_parts_by_classified_tunnel(classification, cmd, version, body)
241            .await
242    }
243    #[deprecated(note = "use send_parts_by_classified_tunnel_with_resp instead")]
244    async fn send2_by_classified_tunnel_with_resp(
245        &self,
246        classification: C,
247        cmd: CMD,
248        version: u8,
249        body: &[&[u8]],
250        timeout: Duration,
251    ) -> CmdResult<CmdBody> {
252        self.send_parts_by_classified_tunnel_with_resp(classification, cmd, version, body, timeout)
253            .await
254    }
255    async fn send_cmd_by_classified_tunnel(
256        &self,
257        classification: C,
258        cmd: CMD,
259        version: u8,
260        body: CmdBody,
261    ) -> CmdResult<()>;
262    async fn send_cmd_by_classified_tunnel_with_resp(
263        &self,
264        classification: C,
265        cmd: CMD,
266        version: u8,
267        body: CmdBody,
268        timeout: Duration,
269    ) -> CmdResult<CmdBody>;
270    async fn send_by_peer_classified_tunnel(
271        &self,
272        peer_id: &PeerId,
273        classification: C,
274        cmd: CMD,
275        version: u8,
276        body: &[u8],
277    ) -> CmdResult<()>;
278    async fn send_by_peer_classified_tunnel_with_resp(
279        &self,
280        peer_id: &PeerId,
281        classification: C,
282        cmd: CMD,
283        version: u8,
284        body: &[u8],
285        timeout: Duration,
286    ) -> CmdResult<CmdBody>;
287    async fn send_parts_by_peer_classified_tunnel(
288        &self,
289        peer_id: &PeerId,
290        classification: C,
291        cmd: CMD,
292        version: u8,
293        body: &[&[u8]],
294    ) -> CmdResult<()>;
295    async fn send_parts_by_peer_classified_tunnel_with_resp(
296        &self,
297        peer_id: &PeerId,
298        classification: C,
299        cmd: CMD,
300        version: u8,
301        body: &[&[u8]],
302        timeout: Duration,
303    ) -> CmdResult<CmdBody>;
304    #[deprecated(note = "use send_parts_by_peer_classified_tunnel instead")]
305    async fn send2_by_peer_classified_tunnel(
306        &self,
307        peer_id: &PeerId,
308        classification: C,
309        cmd: CMD,
310        version: u8,
311        body: &[&[u8]],
312    ) -> CmdResult<()> {
313        self.send_parts_by_peer_classified_tunnel(peer_id, classification, cmd, version, body)
314            .await
315    }
316    #[deprecated(note = "use send_parts_by_peer_classified_tunnel_with_resp instead")]
317    async fn send2_by_peer_classified_tunnel_with_resp(
318        &self,
319        peer_id: &PeerId,
320        classification: C,
321        cmd: CMD,
322        version: u8,
323        body: &[&[u8]],
324        timeout: Duration,
325    ) -> CmdResult<CmdBody> {
326        self.send_parts_by_peer_classified_tunnel_with_resp(
327            peer_id,
328            classification,
329            cmd,
330            version,
331            body,
332            timeout,
333        )
334        .await
335    }
336    async fn send_cmd_by_peer_classified_tunnel(
337        &self,
338        peer_id: &PeerId,
339        classification: C,
340        cmd: CMD,
341        version: u8,
342        body: CmdBody,
343    ) -> CmdResult<()>;
344    async fn send_cmd_by_peer_classified_tunnel_with_resp(
345        &self,
346        peer_id: &PeerId,
347        classification: C,
348        cmd: CMD,
349        version: u8,
350        body: CmdBody,
351        timeout: Duration,
352    ) -> CmdResult<CmdBody>;
353    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
354    async fn find_tunnel_id_by_peer_classified(
355        &self,
356        peer_id: &PeerId,
357        classification: C,
358    ) -> CmdResult<TunnelId>;
359    async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
360    async fn get_send_by_peer_classified(
361        &self,
362        peer_id: &PeerId,
363        classification: C,
364    ) -> CmdResult<G>;
365}
366
367pub(crate) fn create_recv_handle<
368    M: CmdTunnelMeta,
369    R: CmdTunnelRead<M>,
370    W: CmdTunnelWrite<M>,
371    LEN: RawEncode
372        + for<'a> RawDecode<'a>
373        + Copy
374        + Send
375        + Sync
376        + 'static
377        + FromPrimitive
378        + ToPrimitive
379        + RawFixedBytes,
380    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
381>(
382    mut reader: RHalf<R, W>,
383    write: ObjectHolder<WHalf<R, W>>,
384    tunnel_id: TunnelId,
385    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
386) -> JoinHandle<CmdResult<()>> {
387    let recv_handle = tokio::spawn(async move {
388        let ret: CmdResult<()> = async move {
389            let local_id = reader.get_local_peer_id();
390            let remote_id = reader.get_remote_peer_id();
391            loop {
392                log::trace!("tunnel {:?} enter recv proc", tunnel_id);
393                let header_len = reader
394                    .read_u8()
395                    .await
396                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
397                log::trace!("tunnel {:?} recv cmd len {}", tunnel_id, header_len);
398                let mut header = vec![0u8; header_len as usize];
399                let n = reader
400                    .read_exact(&mut header)
401                    .await
402                    .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
403                if n == 0 {
404                    break;
405                }
406                let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice())
407                    .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
408                log::trace!(
409                    "tunnel {:?} recv cmd {:?} from {} len {}",
410                    tunnel_id,
411                    header.cmd_code(),
412                    remote_id.to_base36(),
413                    header.pkg_len().to_u64().unwrap()
414                );
415                let body_len = header.pkg_len().to_u64().unwrap();
416                let cmd_read =
417                    CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
418                let waiter = cmd_read.get_waiter();
419                let future = waiter
420                    .create_result_future()
421                    .map_err(into_cmd_err!(CmdErrorCode::Failed))?;
422                {
423                    let version = header.version();
424                    let seq = header.seq();
425                    let cmd_code = header.cmd_code();
426                    let body_read = cmd_read;
427                    match cmd_handler
428                        .handle(
429                            local_id.clone(),
430                            remote_id.clone(),
431                            tunnel_id,
432                            header,
433                            CmdBody::from_reader(BufReader::new(body_read), body_len),
434                        )
435                        .await
436                    {
437                        Ok(Some(mut body)) => {
438                            let mut write = write.get().await;
439                            let header = CmdHeader::<LEN, CMD>::new(
440                                version,
441                                true,
442                                seq,
443                                cmd_code,
444                                LEN::from_u64(body.len()).unwrap(),
445                            );
446                            let buf = header
447                                .to_vec()
448                                .map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
449                            if buf.len() > 255 {
450                                return Err(cmd_err!(
451                                    CmdErrorCode::RawCodecError,
452                                    "header too long"
453                                ));
454                            }
455                            write
456                                .write_u8(buf.len() as u8)
457                                .await
458                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
459                            write
460                                .write_all(buf.as_slice())
461                                .await
462                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
463                            tokio::io::copy(&mut body, write.deref_mut().deref_mut())
464                                .await
465                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
466                            write
467                                .flush()
468                                .await
469                                .map_err(into_cmd_err!(CmdErrorCode::IoError))?;
470                        }
471                        Err(e) => {
472                            log::error!("handle cmd error: {:?}", e);
473                        }
474                        _ => {}
475                    }
476                };
477                reader = future
478                    .await
479                    .map_err(into_cmd_err!(CmdErrorCode::Failed))??;
480                // }
481            }
482            Ok(())
483        }
484        .await;
485        ret
486    });
487    recv_handle
488}