1use std::panic::RefUnwindSafe;
2use crate::vertx::{cm::ClusterManager, message::Message, EventBus};
3use log::{error, info};
4use std::sync::Arc;
5use crossbeam_channel::Sender;
6use tokio::net::TcpListener;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8pub struct NetServer<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> {
12 pub port: u16,
13 event_bus: Option<Arc<EventBus<CM>>>,
14}
15
16impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> NetServer<CM> {
17 pub(crate) fn new(event_bus: Option<Arc<EventBus<CM>>>) -> &'static mut NetServer<CM> {
18 Box::leak(Box::new(NetServer::<CM> { port: 0, event_bus }))
19 }
20
21 pub(crate) async fn listen_for_message<OP>(&mut self, port: u16, op: OP)
22 where
23 OP: Fn(Vec<u8>, Sender<Arc<Message>>) -> Vec<u8> + 'static + Send + Sync + Copy,
24 {
25 let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await.unwrap();
26 self.port = listener.local_addr().unwrap().port();
27
28 let ev = self.event_bus.clone();
29 let ev = ev.unwrap();
30 let sender = ev.sender.lock();
31
32 let clonse_sender = sender.clone();
33 tokio::spawn(async move {
34 loop {
35 let inner_sender = clonse_sender.clone();
36 let (mut socket, _) = listener.accept().await.unwrap();
37 tokio::spawn(async move {
38 loop {
39 let inner_sender = inner_sender.clone();
40 let mut size = [0; 4];
41 #[allow(unused_assignments)]
42 let mut len = 0;
43 let _n = match socket.read(&mut size).await {
44 Ok(n) if n == 0 => return,
45 Ok(_n) => {
46 len = i32::from_be_bytes(size);
47 }
48 Err(e) => {
49 error!("failed to read from socket; err = {:?}", e);
50 return;
51 }
52 };
53 let mut buf = Vec::with_capacity(len as usize);
54 let _n = match socket.read_buf(&mut buf).await {
55 Ok(n) if n == 0 => return,
56 Ok(n) => &buf[0..n],
57 Err(e) => {
58 error!("failed to read from socket; err = {:?}", e);
59 return;
60 }
61 };
62 let bytes_as_vec = buf.to_vec();
63 if bytes_as_vec[1] == 1 {
64 if let Err(e) = socket.write_all(b"pong").await {
65 error!("failed to write to socket; err = {:?}", e);
66 return;
67 }
68 } else {
69 let data = op(bytes_as_vec, inner_sender);
70 if let Err(e) = socket.write_all(&data).await {
71 error!("failed to write to socket; err = {:?}", e);
72 return;
73 }
74 }
75 }
76 });
77 }
78 } );
79 }
80
81 pub async fn listen<OP>(&'static mut self, port: u16, op: OP)
82 where
83 OP: Fn(&Vec<u8>, Arc<EventBus<CM>>) -> Vec<u8> + 'static + Send + Sync + Copy,
84 {
86 let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await.unwrap();
87 self.port = listener.local_addr().unwrap().port();
88 info!("start net_server on tcp://0.0.0.0:{}", self.port);
89 tokio::spawn(async move {
90 loop {
91 let (mut socket, _) = listener.accept().await.unwrap();
92 let ev = self.event_bus.as_ref().unwrap().clone();
93 tokio::spawn(async move {
94 loop {
95 let local_ev = ev.clone();
96 let mut request: Vec<u8> = vec![];
97 let mut buf = [0; 2048];
98 let _n = match socket.read(&mut buf).await {
99 Ok(n) if n == 0 => return,
100 Ok(n) => request.extend(&buf[0..n]),
101 Err(e) => {
102 error!("failed to read from socket; err = {:?}", e);
103 return;
104 }
105 };
106 let data = op(&request, local_ev);
107 if let Err(e) = socket.write_all(&data).await {
108 error!("failed to write to socket; err = {:?}", e);
109 return;
110 }
111 }
112 });
113 }
114 });
115 }
116}