1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
use crate::vertx::{cm::ClusterManager, message::Message, EventBus, RUNTIME}; use crossbeam_channel::Sender; use log::{error, info}; use std::sync::Arc; use tokio::net::TcpListener; use tokio::io::{AsyncReadExt, AsyncWriteExt}; pub struct NetServer<CM: 'static + ClusterManager + Send + Sync> { pub port: u16, event_bus: Option<Arc<EventBus<CM>>>, } impl<CM: 'static + ClusterManager + Send + Sync> NetServer<CM> { pub(crate) fn new(event_bus: Option<Arc<EventBus<CM>>>) -> &'static mut NetServer<CM> { Box::leak(Box::new(NetServer::<CM> { port: 0, event_bus })) } pub(crate) fn listen_for_message<OP>(&mut self, port: u16, op: OP) where OP: Fn(Vec<u8>, Sender<Message>) -> Vec<u8> + 'static + Send + Sync + Copy, { let listener = RUNTIME .block_on(TcpListener::bind(format!("0.0.0.0:{}", port))) .unwrap(); self.port = listener.local_addr().unwrap().port(); let ev = self.event_bus.clone(); let ev = ev.unwrap().clone(); let sender = ev.sender.lock().unwrap(); let clonse_sender = sender.clone(); std::thread::spawn(move || loop { let inner_sender = clonse_sender.clone(); let (mut socket, _) = RUNTIME.block_on(listener.accept()).unwrap(); RUNTIME.spawn(async move { loop { let inner_sender = inner_sender.clone(); let mut size = [0; 4]; #[allow(unused_assignments)] let mut len = 0; let _n = match socket.read(&mut size).await { Ok(n) if n == 0 => return, Ok(_n) => { len = i32::from_be_bytes(size); } Err(e) => { error!("failed to read from socket; err = {:?}", e); return; } }; let mut buf = Vec::with_capacity(len as usize); let _n = match socket.read_buf(&mut buf).await { Ok(n) if n == 0 => return, Ok(n) => &buf[0..n], Err(e) => { error!("failed to read from socket; err = {:?}", e); return; } }; let bytes_as_vec = buf.to_vec(); let bytes_as_string = String::from_utf8_lossy(&bytes_as_vec); if bytes_as_string.contains("ping") { if let Err(e) = socket.write_all(b"pong").await { error!("failed to write to socket; err = {:?}", e); return; } } else { let data = op(bytes_as_vec, inner_sender); if let Err(e) = socket.write_all(&data).await { error!("failed to write to socket; err = {:?}", e); return; } } } }); }); } pub fn listen<OP>(&'static mut self, port: u16, op: OP) where OP: Fn(&Vec<u8>, Arc<EventBus<CM>>) -> Vec<u8> + 'static + Send + Sync + Copy, { let listener = RUNTIME .block_on(TcpListener::bind(format!("0.0.0.0:{}", port))) .unwrap(); self.port = listener.local_addr().unwrap().port(); info!("start net_server on tcp://0.0.0.0:{}", self.port); std::thread::spawn(move || loop { let (mut socket, _) = RUNTIME.block_on(listener.accept()).unwrap(); let ev = self.event_bus.as_ref().unwrap().clone(); RUNTIME.spawn(async move { loop { let local_ev = ev.clone(); let mut request: Vec<u8> = vec![]; let mut buf = [0; 2048]; let _n = match socket.read(&mut buf).await { Ok(n) if n == 0 => return, Ok(n) => request.extend(&buf[0..n]), Err(e) => { error!("failed to read from socket; err = {:?}", e); return; } }; let data = op(&request, local_ev); if let Err(e) = socket.write_all(&data).await { error!("failed to write to socket; err = {:?}", e); return; } } }); }); } }