geng_net/
server.rs

1use super::*;
2
3use std::sync::Mutex;
4
5pub trait App: Send + 'static {
6    type Client: Receiver<Self::ClientMessage>;
7    type ServerMessage: Message;
8    type ClientMessage: Message;
9    fn connect(&mut self, sender: Box<dyn Sender<Self::ServerMessage>>) -> Self::Client;
10}
11
12struct Handler<T: App> {
13    app: Arc<Mutex<T>>,
14    sender: ws::Sender,
15    client: Option<T::Client>,
16}
17
18struct BackgroundSender {
19    sender: std::sync::mpsc::Sender<Arc<Vec<u8>>>,
20}
21
22impl BackgroundSender {
23    fn new(ws_sender: ws::Sender) -> Self {
24        let (sender, receiver) = std::sync::mpsc::channel::<Arc<Vec<u8>>>();
25        std::thread::spawn(move || {
26            while let Ok(data) = receiver.recv() {
27                ws_sender
28                    .send(ws::Message::Binary((*data).clone()))
29                    .expect("Failed to send message");
30            }
31        });
32        Self { sender }
33    }
34}
35
36impl<T: Message> Sender<T> for BackgroundSender {
37    fn send_serialized(&mut self, data: Arc<Vec<u8>>) {
38        self.sender.send(data).expect("Failed to send message");
39    }
40}
41
42impl<T: App> ws::Handler for Handler<T> {
43    fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
44        self.client = Some(
45            self.app
46                .lock()
47                .unwrap()
48                .connect(Box::new(BackgroundSender::new(self.sender.clone()))),
49        );
50        Ok(())
51    }
52    fn on_message(&mut self, message: ws::Message) -> ws::Result<()> {
53        let message = match deserialize_message(&message.into_data()) {
54            Ok(message) => message,
55            Err(e) => {
56                return Err(ws::Error::new(ws::ErrorKind::Protocol, e.to_string()));
57            }
58        };
59        log::trace!("Received message from client: {:?}", message);
60        if let Some(client) = &mut self.client {
61            client.handle(message);
62        } else {
63            log::error!("WUT! received a message before handshake");
64        }
65        Ok(())
66    }
67}
68
69struct Factory<T: App> {
70    app: Arc<Mutex<T>>,
71}
72
73impl<T: App> Factory<T> {
74    fn new(app: T) -> Self {
75        Self {
76            app: Arc::new(Mutex::new(app)),
77        }
78    }
79}
80
81impl<T: App> ws::Factory for Factory<T> {
82    type Handler = Handler<T>;
83
84    fn connection_made(&mut self, sender: ws::Sender) -> Handler<T> {
85        log::info!("New connection");
86        Handler {
87            app: self.app.clone(),
88            sender,
89            client: None,
90        }
91    }
92}
93
94pub struct Server<T: App> {
95    ws: ws::WebSocket<Factory<T>>,
96}
97
98#[derive(Clone)]
99pub struct ServerHandle {
100    sender: ws::Sender,
101}
102
103impl ServerHandle {
104    pub fn shutdown(&self) {
105        self.sender.shutdown().expect("Failed to shutdown server");
106    }
107}
108
109impl<T: App> Server<T> {
110    pub fn new(app: T, addr: impl std::net::ToSocketAddrs + Debug + Copy) -> Self {
111        let factory = Factory::new(app);
112        let ws = ws::Builder::new()
113            .with_settings(ws::Settings {
114                max_connections: 10000,
115                // fragments_capacity: todo!(),
116                // fragments_grow: todo!(),
117                // max_fragment_size: todo!(),
118                // in_buffer_capacity: todo!(),
119                // in_buffer_grow: todo!(),
120                // out_buffer_capacity: todo!(),
121                // out_buffer_grow: todo!(),
122                panic_on_internal: false,
123                tcp_nodelay: true,
124                ..Default::default()
125            })
126            .build(factory)
127            .unwrap();
128        let ws = match ws.bind(addr) {
129            Ok(ws) => ws,
130            Err(e) => {
131                log::error!("Failed to bind server to {:?}: {}", addr, e);
132                panic!("{e:?}");
133            }
134        };
135        Self { ws }
136    }
137    pub fn handle(&self) -> ServerHandle {
138        ServerHandle {
139            sender: self.ws.broadcaster(),
140        }
141    }
142    pub fn run(self) {
143        log::info!("Starting the server");
144        match self.ws.run() {
145            Ok(_) => {
146                log::info!("Server finished successfully");
147            }
148            Err(e) => {
149                log::error!("Server shutdown with error: {}", e);
150                panic!("{e:?}");
151            }
152        }
153    }
154}