vertx_rust/net/
mod.rs

1use std::panic::RefUnwindSafe;
2use crate::vertx::{cm::ClusterManager, message::Message, EventBus};
3use log::{error, info};
4use std::sync::Arc;
5use crossbeam_channel::Sender;
6use tokio::net::TcpListener;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8// use tokio::sync::mpsc::Sender;
9
10
11pub struct NetServer<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> {
12    pub port: u16,
13    event_bus: Option<Arc<EventBus<CM>>>,
14}
15
16impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> NetServer<CM> {
17    pub(crate) fn new(event_bus: Option<Arc<EventBus<CM>>>) -> &'static mut NetServer<CM> {
18        Box::leak(Box::new(NetServer::<CM> { port: 0, event_bus }))
19    }
20
21    pub(crate) async fn listen_for_message<OP>(&mut self, port: u16, op: OP)
22    where
23        OP: Fn(Vec<u8>, Sender<Arc<Message>>) -> Vec<u8> + 'static + Send + Sync + Copy,
24    {
25        let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await.unwrap();
26        self.port = listener.local_addr().unwrap().port();
27
28        let ev = self.event_bus.clone();
29        let ev = ev.unwrap();
30        let sender = ev.sender.lock();
31
32        let clonse_sender = sender.clone();
33        tokio::spawn(async move {
34            loop {
35                let inner_sender = clonse_sender.clone();
36                let (mut socket, _) = listener.accept().await.unwrap();
37                tokio::spawn(async move {
38                    loop {
39                        let inner_sender = inner_sender.clone();
40                        let mut size = [0; 4];
41                        #[allow(unused_assignments)]
42                            let mut len = 0;
43                        let _n = match socket.read(&mut size).await {
44                            Ok(n) if n == 0 => return,
45                            Ok(_n) => {
46                                len = i32::from_be_bytes(size);
47                            }
48                            Err(e) => {
49                                error!("failed to read from socket; err = {:?}", e);
50                                return;
51                            }
52                        };
53                        let mut buf = Vec::with_capacity(len as usize);
54                        let _n = match socket.read_buf(&mut buf).await {
55                            Ok(n) if n == 0 => return,
56                            Ok(n) => &buf[0..n],
57                            Err(e) => {
58                                error!("failed to read from socket; err = {:?}", e);
59                                return;
60                            }
61                        };
62                        let bytes_as_vec = buf.to_vec();
63                        if bytes_as_vec[1] == 1 {
64                            if let Err(e) = socket.write_all(b"pong").await {
65                                error!("failed to write to socket; err = {:?}", e);
66                                return;
67                            }
68                        } else {
69                            let data = op(bytes_as_vec, inner_sender);
70                            if let Err(e) = socket.write_all(&data).await {
71                                error!("failed to write to socket; err = {:?}", e);
72                                return;
73                            }
74                        }
75                    }
76                });
77            }
78        } );
79    }
80
81    pub async fn listen<OP>(&'static mut self, port: u16, op: OP)
82    where
83        OP: Fn(&Vec<u8>, Arc<EventBus<CM>>) -> Vec<u8> + 'static + Send + Sync + Copy,
84        // T: Future<Output = Vec<u8>> + 'static + Send + Sync + Clone,
85    {
86        let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await.unwrap();
87        self.port = listener.local_addr().unwrap().port();
88        info!("start net_server on tcp://0.0.0.0:{}", self.port);
89        tokio::spawn(async move {
90            loop {
91                let (mut socket, _) = listener.accept().await.unwrap();
92                let ev = self.event_bus.as_ref().unwrap().clone();
93                tokio::spawn(async move {
94                    loop {
95                        let local_ev = ev.clone();
96                        let mut request: Vec<u8> = vec![];
97                        let mut buf = [0; 2048];
98                        let _n = match socket.read(&mut buf).await {
99                            Ok(n) if n == 0 => return,
100                            Ok(n) => request.extend(&buf[0..n]),
101                            Err(e) => {
102                                error!("failed to read from socket; err = {:?}", e);
103                                return;
104                            }
105                        };
106                        let data = op(&request, local_ev);
107                        if let Err(e) = socket.write_all(&data).await {
108                            error!("failed to write to socket; err = {:?}", e);
109                            return;
110                        }
111                    }
112                });
113            }
114        });
115    }
116}