1
2use futures::stream::SplitSink;
3use warp::Filter;
4use warp::ws::Message;
5use warp::ws::WebSocket;
6use warp::ws::Ws;
7
8use futures::stream::StreamExt;
9
10use core::fmt;
11
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use futures::SinkExt;
16
17use tokio::sync::broadcast::Sender as BroadcastSender;
18use tokio::sync::broadcast::Receiver as BroadcastReceiver;
19
20use tokio::sync::mpsc::Sender as SubscriptionSender;
21
22use tokio::sync::mpsc as MPSC;
23
24use futures::Future;
25
26use crate::Filemap;
27use crate::JSMessageTx;
28use crate::JSType;
29use crate::ui::BATCH_BEGIN;
30use crate::ui::BATCH_END;
31use crate::ui::CLOSE_REQUEST;
32use crate::ui::utils::get_extension_from_filename;
33use crate::ui_data::ROOT_ID;
34
35type MessageBuffer = Arc<Mutex<Vec<Message>>>;
36
37pub struct WSServer {
38 filemap: Arc<Mutex<Filemap>>,
39 port: u16,
40 client_tx: BroadcastSender<Message>,
41 buffer: MessageBuffer,
42 subscription_sender: SubscriptionSender<String>
43}
44
45
46impl fmt::Debug for WSServer {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 let buf = self.buffer.lock().unwrap();
49 f.debug_struct("Server")
50 .field("port", &self.port)
51 .field("message queue size", &buf.len())
52 .finish()
53 }
54}
55
56#[derive(Clone)]
57pub (crate) struct MsgTx {
58 tx: BroadcastSender<Message>,
59}
60
61impl MsgTx {
62 pub (crate) fn send(&self, msg: String) {
63 if self.tx.receiver_count() > 0 {
64 let tx = self.tx.clone();
65 tokio::spawn(MsgTx::do_send(tx, msg));
66 }
67 }
68
69
70 pub (crate) fn send_bin(&self, bin: Vec<u8>) {
71 if self.tx.receiver_count() > 0 {
72 let tx = self.tx.clone();
73 tokio::spawn(MsgTx::do_send_bin(tx, bin));
74 }
75 }
76}
77
78
79impl MsgTx {
81 async fn do_send(tx: BroadcastSender<Message>, msg: String) {
82 tx.send(Message::text(&msg)).unwrap_or_else(|e| {
83 let obj = serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&msg);
86 if obj.is_err()
87 || !obj.as_ref().unwrap().contains_key("type")
88 || obj.as_ref().unwrap()["type"].to_string() != format!("\"{CLOSE_REQUEST}\"")
89 || tx.receiver_count() != 0 {
90 eprintln!("Channel error {e} on {msg} tx count: {}", tx.receiver_count());
91 }
92 0
93 });
94 }
95
96 async fn do_send_bin(tx: BroadcastSender<Message>, msg: Vec<u8>) {
97 tx.send(Message::binary(msg)).unwrap_or_else(|e|{
98 eprintln!("Channel error {e}");
99 0
100 });
101 }
102}
103
104pub (crate) static ENTERED: &str = "entered";
107
108async fn wait_early_messages(msg_queue: MessageBuffer, mut rx: BroadcastReceiver<Message>) {
109 let mut entered = false;
110 while ! entered {
111 let msg = rx.recv().await;
112 match msg {
113 Ok(msg) => {
114 let mut queue = msg_queue.lock().unwrap();
115 if msg.is_text() && msg.to_str().unwrap() == ENTERED {
116 entered = true;
117 }
118 if !entered {
119 queue.push(msg);
120 }
121 }
122 Err(e) => {
123 eprintln!("Cannot read {e}");
124 break;
125 }
126 }
127 }
128}
129
130fn write_to_buffer(msg: Message, buffer: &MessageBuffer) {
131 let mut buf = buffer.lock().unwrap();
132 debug_assert!(msg.is_text());
133 buf.push(msg);
134}
135
136pub(super) fn new(filemap: Arc<Mutex<Filemap>>, port: u16, subscription_sender: SubscriptionSender<String>) -> WSServer {
137 let (client_tx, buffer_rx) = tokio::sync::broadcast::channel(64);
138 let buffer = Arc::new(Mutex::new(Vec::new()));
140 tokio::spawn(wait_early_messages(buffer.clone(), buffer_rx));
142
143 WSServer {
144 filemap,
145 port,
146 client_tx,
147 buffer,
148 subscription_sender,
149 }
150}
151
152
153impl WSServer {
154
155 pub (crate) fn sender(&self) -> MsgTx {
156 MsgTx{tx: self.client_tx.clone()}
157 }
158
159 pub (crate) fn port(&self) -> u16 {
160 self.port
161 }
162
163 fn take_as_msg( buffer: &MessageBuffer) -> (Vec<JSType>, Vec<Vec<u8>>) {
167 let buf = buffer.lock().unwrap();
168 let mut vec_txt = Vec::new();
169 let mut vec_bin = Vec::new();
170 for msg in buf.iter() {
171 if msg.is_text() {
172 let s = msg.to_str().unwrap();
173 if s == BATCH_BEGIN || s == BATCH_END {continue;}
174 let json = serde_json::from_str(s).unwrap_or_else(
175 |e|panic!("Invalid Json '{s}': {e}"));
176 vec_txt.push(json);
177 } else {
178 vec_bin.push(msg.as_bytes().to_vec());
179 }
180 }
181 (vec_txt, vec_bin)
182 }
183
184 fn clear_buffer( buffer: &MessageBuffer) {
185 let mut buf = buffer.lock().unwrap();
186 buf.clear();
187 }
188
189 async fn send_buffered(sender: &mut SplitSink<WebSocket, Message>, buffer: &MessageBuffer) {
190 let (msg_buffer, msg_bin) = Self::take_as_msg(buffer);
191 let msg = JSMessageTx {
192 element: ROOT_ID,
193 _type: "batch",
194 batches: Some(&msg_buffer),
195 ..Default::default()
196 };
197 let json = serde_json::to_string(&msg).unwrap();
198 sender.send(Message::text(json)).await.unwrap_or_else(|e| eprintln!("Cannot send {e}"));
199 for item in msg_bin {
201 sender.send(Message::binary(item)).await.unwrap_or_else(|e| eprintln!("Cannot send {e}"));
202 }
203 }
204
205 async fn handle_ws_client(websocket: WebSocket,
206 client_tx: BroadcastSender<Message>,
207 buffer: MessageBuffer,
208 subscription_sender: SubscriptionSender<String>,
209 exit_tx: MPSC::Sender<bool>,
210 is_gui: bool) {
211 let (mut sender, mut receiver) = websocket.split();
214
215 let mut do_buffer = false; let mut client_rx = client_tx.subscribe();
218
219 loop {
220 tokio::select! {
221 Some(ws_msg) = receiver.next() => {
222 match ws_msg {
223 Ok(msg) => {
224 if msg.is_text() {
225 let txt = String::from(msg.to_str().unwrap());
226 subscription_sender.send(txt).await.unwrap();
227 } else if msg.is_close() {
228 if let Some(cf) = msg.close_frame() {
230 if cf.0 != 1001 {
231 eprintln!("Closed code:{} std:{}", cf.0, cf.1);
232 }
233 }
234 let close = format!("{{\"type\": \"{CLOSE_REQUEST}\"}}");
235 subscription_sender.send(close).await.unwrap();
236 break;
237 } else if msg.is_ping() {
238 } else {
240 eprintln!("Unexpected message type: {msg:#?}");
241 }
242 },
243 Err(error) => {
244 if ! error.to_string().contains("Connection reset without closing handshake") {
245 eprintln!("error reading message on websocket: {error}");
246 }
247 break;
248 }
249 };
250 },
251 cl_msg = client_rx.recv() => {
252 match cl_msg {
253 Ok(msg) => {
254 if msg.is_text() && msg.to_str().unwrap() == ENTERED {
255 Self::send_buffered(&mut sender, &buffer).await;
256 let started = String::from("{\"type\": \"start_request\"}");
257 subscription_sender.send(started).await.unwrap();
258 } else if msg.is_text() && msg.to_str().unwrap() == BATCH_BEGIN {
259 if ! do_buffer {
260 Self::clear_buffer(&buffer);
261 do_buffer = true;
262 }
263 } else if msg.is_text() && msg.to_str().unwrap() == BATCH_END {
264 if do_buffer {
265 Self::send_buffered(&mut sender, &buffer).await;
266 do_buffer = false;
267 }
268 } else if msg.is_text() && do_buffer {
269 write_to_buffer(msg, &buffer);
270 } else if msg.is_text() || is_gui { sender.send(msg).await.unwrap_or_else(|e| eprintln!("Cannot send msg: {e}"));
272 }
273 },
274 Err(e) => {
275 eprintln!("error reading message from element: {e}");
276 }
277 };
278 },
279 }
280 }
281 exit_tx.send(true).await.expect("Error in exit");
282 }
283
284 pub async fn run<F, Fut>(&self, on_start: F) -> Option<tokio::task::JoinHandle<()>>
286 where F: FnOnce(u16) -> Fut + Send + 'static,
287 Fut: Future<Output = bool> + Send + 'static {
288
289 let fm = self.filemap.clone();
290
291 let get_routes =
296 warp::get()
297 .and(warp::path::tail()
298 .map(move |path: warp::path::Tail| {
299 let name = path.as_str();
300 let file_map = fm.lock().unwrap();
301 assert!(file_map.contains_key(name), "Request not found: {name:#?}");
302
303 let mime = Self::file_to_mime(name).unwrap_or("octect-stream");
304
305 warp::reply::with_header(file_map[name].clone(), "content-type", mime)
306
307 }));
308
309 let (exit_tx, mut exit_rx) = MPSC::channel(32);
310
311
312 let buffer = self.buffer.clone();
313 let client_tx = self.client_tx.clone();
314 let subscription_sender = self.subscription_sender.clone();
315
316 let ui_route = warp::ws()
317 .and(warp::path("gemgui"))
318 .and(warp::path::param())
319 .map( move |ws: Ws, name: String| {
320 let buffer = buffer.clone();
321 let client_tx = client_tx.clone();
322 let subscription_sender = subscription_sender.clone();
323 let exit_tx = exit_tx.clone();
324 let is_gui = name != "extension";
325 ws.on_upgrade( move |websocket: WebSocket| {
326 Self::handle_ws_client(websocket, client_tx, buffer, subscription_sender, exit_tx, is_gui)
327 })
328 });
329
330 let all_routes = ui_route
331 .or(get_routes);
332
333 let (_, fut) = warp::serve(all_routes)
334 .bind_with_graceful_shutdown(([127, 0, 0, 1], self.port), async move {
335 tokio::select! {
336 Some(_) = exit_rx.recv() => {}
337 }
338 });
339
340 let fut_srv = tokio::spawn(fut);
341
342 if ! on_start(self.port).await {
344 eprintln!("Start failed, exit");
345 return None; }
347
348 Some(fut_srv)
349
350 }
351
352 fn file_to_mime(filename: &str) -> Option<&str>{
353 let ext = get_extension_from_filename(filename)?;
354 let ext = ext.to_ascii_lowercase();
355 let ext = ext.as_str();
356
357 static MAP: phf::Map<&'static str, &'static str> = phf::phf_map! {
358 "html" => "text/html;charset=utf-8",
359 "css" => "text/css;charset=utf-8",
360 "js" => "text/javascript;charset=utf-8",
361 "txt" => "text/txt;charset=utf-8",
362 "ico" => "image/x-icon",
363 "png" => "image/png",
364 "jpg" => "image/jpeg",
365 "gif" => "image/gif",
366 "svg" => "image/svg+xml"
367 };
368
369 match MAP.get(ext) {
370 Some(v) => Some(*v),
371 None => None,
372 }
373 }
374
375}
376
377
378