1use super::*;
2
3use std::sync::Mutex;
4
5pub trait App: Send + 'static {
6 type Client: Receiver<Self::ClientMessage>;
7 type ServerMessage: Message;
8 type ClientMessage: Message;
9 fn connect(&mut self, sender: Box<dyn Sender<Self::ServerMessage>>) -> Self::Client;
10}
11
12struct Handler<T: App> {
13 app: Arc<Mutex<T>>,
14 sender: ws::Sender,
15 client: Option<T::Client>,
16}
17
18struct BackgroundSender {
19 sender: std::sync::mpsc::Sender<Arc<Vec<u8>>>,
20}
21
22impl BackgroundSender {
23 fn new(ws_sender: ws::Sender) -> Self {
24 let (sender, receiver) = std::sync::mpsc::channel::<Arc<Vec<u8>>>();
25 std::thread::spawn(move || {
26 while let Ok(data) = receiver.recv() {
27 ws_sender
28 .send(ws::Message::Binary((*data).clone()))
29 .expect("Failed to send message");
30 }
31 });
32 Self { sender }
33 }
34}
35
36impl<T: Message> Sender<T> for BackgroundSender {
37 fn send_serialized(&mut self, data: Arc<Vec<u8>>) {
38 self.sender.send(data).expect("Failed to send message");
39 }
40}
41
42impl<T: App> ws::Handler for Handler<T> {
43 fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
44 self.client = Some(
45 self.app
46 .lock()
47 .unwrap()
48 .connect(Box::new(BackgroundSender::new(self.sender.clone()))),
49 );
50 Ok(())
51 }
52 fn on_message(&mut self, message: ws::Message) -> ws::Result<()> {
53 let message = match deserialize_message(&message.into_data()) {
54 Ok(message) => message,
55 Err(e) => {
56 return Err(ws::Error::new(ws::ErrorKind::Protocol, e.to_string()));
57 }
58 };
59 log::trace!("Received message from client: {:?}", message);
60 if let Some(client) = &mut self.client {
61 client.handle(message);
62 } else {
63 log::error!("WUT! received a message before handshake");
64 }
65 Ok(())
66 }
67}
68
69struct Factory<T: App> {
70 app: Arc<Mutex<T>>,
71}
72
73impl<T: App> Factory<T> {
74 fn new(app: T) -> Self {
75 Self {
76 app: Arc::new(Mutex::new(app)),
77 }
78 }
79}
80
81impl<T: App> ws::Factory for Factory<T> {
82 type Handler = Handler<T>;
83
84 fn connection_made(&mut self, sender: ws::Sender) -> Handler<T> {
85 log::info!("New connection");
86 Handler {
87 app: self.app.clone(),
88 sender,
89 client: None,
90 }
91 }
92}
93
94pub struct Server<T: App> {
95 ws: ws::WebSocket<Factory<T>>,
96}
97
98#[derive(Clone)]
99pub struct ServerHandle {
100 sender: ws::Sender,
101}
102
103impl ServerHandle {
104 pub fn shutdown(&self) {
105 self.sender.shutdown().expect("Failed to shutdown server");
106 }
107}
108
109impl<T: App> Server<T> {
110 pub fn new(app: T, addr: impl std::net::ToSocketAddrs + Debug + Copy) -> Self {
111 let factory = Factory::new(app);
112 let ws = ws::Builder::new()
113 .with_settings(ws::Settings {
114 max_connections: 10000,
115 panic_on_internal: false,
123 tcp_nodelay: true,
124 ..Default::default()
125 })
126 .build(factory)
127 .unwrap();
128 let ws = match ws.bind(addr) {
129 Ok(ws) => ws,
130 Err(e) => {
131 log::error!("Failed to bind server to {:?}: {}", addr, e);
132 panic!("{e:?}");
133 }
134 };
135 Self { ws }
136 }
137 pub fn handle(&self) -> ServerHandle {
138 ServerHandle {
139 sender: self.ws.broadcaster(),
140 }
141 }
142 pub fn run(self) {
143 log::info!("Starting the server");
144 match self.ws.run() {
145 Ok(_) => {
146 log::info!("Server finished successfully");
147 }
148 Err(e) => {
149 log::error!("Server shutdown with error: {}", e);
150 panic!("{e:?}");
151 }
152 }
153 }
154}