gemgui/ui/
server.rs

1
2use futures::stream::SplitSink;
3use warp::Filter;
4use warp::ws::Message;
5use warp::ws::WebSocket;
6use warp::ws::Ws;
7
8use futures::stream::StreamExt;
9
10use core::fmt;
11
12use std::sync::Arc;
13use std::sync::Mutex;
14
15use futures::SinkExt;
16
17use tokio::sync::broadcast::Sender as BroadcastSender;
18use tokio::sync::broadcast::Receiver as BroadcastReceiver;
19
20use tokio::sync::mpsc::Sender as SubscriptionSender;
21
22use tokio::sync::mpsc as MPSC;
23
24use futures::Future;
25
26use crate::Filemap;
27use crate::JSMessageTx;
28use crate::JSType;
29use crate::ui::BATCH_BEGIN;
30use crate::ui::BATCH_END;
31use crate::ui::CLOSE_REQUEST;
32use crate::ui::utils::get_extension_from_filename;
33use crate::ui_data::ROOT_ID;
34
35type MessageBuffer = Arc<Mutex<Vec<Message>>>;
36
37pub struct WSServer {
38    filemap: Arc<Mutex<Filemap>>,
39    port: u16,
40    client_tx: BroadcastSender<Message>,
41    buffer: MessageBuffer,
42    subscription_sender: SubscriptionSender<String>
43}
44
45
46impl fmt::Debug for WSServer {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        let buf = self.buffer.lock().unwrap();
49        f.debug_struct("Server")
50        .field("port", &self.port)
51        .field("message queue size", &buf.len())
52        .finish()
53    }
54}
55
56#[derive(Clone)]
57pub (crate) struct MsgTx {
58    tx: BroadcastSender<Message>,
59} 
60
61impl MsgTx {
62    pub (crate) fn send(&self, msg: String) {
63            if self.tx.receiver_count() > 0 {
64                let tx = self.tx.clone();
65                tokio::spawn(MsgTx::do_send(tx, msg));
66            }
67        }
68
69        
70    pub (crate) fn send_bin(&self, bin: Vec<u8>) {
71        if self.tx.receiver_count() > 0 {
72            let tx = self.tx.clone();
73            tokio::spawn(MsgTx::do_send_bin(tx, bin));
74        }
75    }
76}
77   
78
79// sends message from element to socket server
80impl MsgTx {
81    async fn do_send(tx: BroadcastSender<Message>, msg: String) {
82        tx.send(Message::text(&msg)).unwrap_or_else(|e| {
83            // somewhat heavy error handing only upon error - then we look if 
84            // it was on close_exit and ignore if channels are already closed
85            let obj = serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&msg);
86            if obj.is_err() 
87            || !obj.as_ref().unwrap().contains_key("type") 
88            || obj.as_ref().unwrap()["type"].to_string() != format!("\"{CLOSE_REQUEST}\"") 
89            || tx.receiver_count() != 0 {  
90                eprintln!("Channel error {e} on {msg} tx count: {}", tx.receiver_count());
91            }
92            0
93        });
94    }
95    
96    async fn do_send_bin(tx: BroadcastSender<Message>, msg: Vec<u8>) {
97        tx.send(Message::binary(msg)).unwrap_or_else(|e|{
98            eprintln!("Channel error {e}");
99            0
100        });
101    }  
102}
103
104// receive message from element
105
106pub (crate) static ENTERED: &str = "entered";
107
108async fn wait_early_messages(msg_queue: MessageBuffer, mut rx: BroadcastReceiver<Message>) {
109    let mut entered = false;
110    while ! entered  {
111        let msg = rx.recv().await;
112        match msg {
113            Ok(msg) => {
114                let mut queue = msg_queue.lock().unwrap();
115                if msg.is_text() && msg.to_str().unwrap() == ENTERED {
116                    entered = true;
117                }
118                if !entered {
119                    queue.push(msg);
120                }
121            }
122            Err(e) => {
123                eprintln!("Cannot read {e}");
124                break;
125            }
126        }
127    }
128}
129
130fn write_to_buffer(msg: Message, buffer: &MessageBuffer) {
131    let mut buf = buffer.lock().unwrap();
132    debug_assert!(msg.is_text());
133    buf.push(msg);
134}
135
136pub(super) fn new(filemap: Arc<Mutex<Filemap>>, port: u16, subscription_sender: SubscriptionSender<String>) -> WSServer {
137    let (client_tx, buffer_rx) = tokio::sync::broadcast::channel(64);
138    // we need a buffer where to copy message before  ws is open
139    let buffer = Arc::new(Mutex::new(Vec::new()));
140    // messages are listed in own loop, that let us also handle messages from user
141    tokio::spawn(wait_early_messages(buffer.clone(), buffer_rx));
142
143    WSServer {
144        filemap,
145        port,
146        client_tx,
147        buffer,
148        subscription_sender,
149    }
150}
151
152
153impl WSServer {
154
155    pub (crate) fn sender(&self) ->  MsgTx {
156        MsgTx{tx: self.client_tx.clone()}
157    }
158
159    pub (crate) fn port(&self) -> u16 {
160        self.port
161    }
162
163    // Mutex protected data accessor, we cannot use await in iteration (for) loop
164    // as then mutex is locked and async-send cannot happen
165    // Just clone data from index 
166    fn take_as_msg( buffer: &MessageBuffer) -> (Vec<JSType>, Vec<Vec<u8>>) {
167        let buf = buffer.lock().unwrap();
168        let mut vec_txt = Vec::new();
169        let mut vec_bin = Vec::new();
170        for msg in buf.iter() {
171            if msg.is_text() {
172                let s = msg.to_str().unwrap();
173                if s == BATCH_BEGIN || s == BATCH_END {continue;}
174                let json = serde_json::from_str(s).unwrap_or_else(
175                    |e|panic!("Invalid Json '{s}': {e}"));
176                vec_txt.push(json);
177            } else {
178                vec_bin.push(msg.as_bytes().to_vec());
179            }
180        }
181        (vec_txt, vec_bin)    
182    }
183
184    fn clear_buffer( buffer: &MessageBuffer) {
185        let mut buf = buffer.lock().unwrap();
186        buf.clear();
187    }
188
189    async fn send_buffered(sender: &mut SplitSink<WebSocket, Message>, buffer: &MessageBuffer) { 
190        let (msg_buffer, msg_bin) = Self::take_as_msg(buffer);
191        let msg = JSMessageTx {
192            element: ROOT_ID,
193            _type: "batch",
194            batches: Some(&msg_buffer),
195            ..Default::default()
196        };
197        let json = serde_json::to_string(&msg).unwrap();
198        sender.send(Message::text(json)).await.unwrap_or_else(|e| eprintln!("Cannot send {e}"));
199        // binary messages cannot sent as batch 
200        for item in msg_bin {
201            sender.send(Message::binary(item)).await.unwrap_or_else(|e| eprintln!("Cannot send {e}"));
202        }
203    }
204
205    async fn handle_ws_client(websocket: WebSocket,
206        client_tx: BroadcastSender<Message>,
207        buffer: MessageBuffer,
208        subscription_sender: SubscriptionSender<String>,
209        exit_tx: MPSC::Sender<bool>,
210        is_gui: bool) {
211        // receiver - this server, from websocket client
212        // sender - diff clients connected to this server
213        let (mut sender, mut receiver) = websocket.split();
214
215        let mut do_buffer = false; // at start we buffer
216
217        let mut client_rx = client_tx.subscribe();
218
219        loop {
220            tokio::select! {
221                Some(ws_msg) = receiver.next() => {
222                    match ws_msg {
223                        Ok(msg) => {
224                            if msg.is_text() {
225                                let txt = String::from(msg.to_str().unwrap());
226                                subscription_sender.send(txt).await.unwrap();
227                            } else if msg.is_close() {
228                                // tell ui.rs to leave the loop - Json constant...
229                                if let Some(cf) = msg.close_frame() {
230                                    if cf.0 != 1001 {
231                                        eprintln!("Closed code:{} std:{}", cf.0, cf.1);
232                                    }
233                                }
234                                let close = format!("{{\"type\": \"{CLOSE_REQUEST}\"}}"); 
235                                subscription_sender.send(close).await.unwrap();
236                                break;  
237                            } else if msg.is_ping() {
238                                // wont response to pong, underneath should do it   
239                            } else {
240                                eprintln!("Unexpected message type: {msg:#?}");
241                            }
242                        },
243                        Err(error) => {
244                            if ! error.to_string().contains("Connection reset without closing handshake") {  
245                                eprintln!("error reading message on websocket: {error}");
246                            }
247                            break;
248                        }
249                    };   
250                },
251                cl_msg = client_rx.recv() => {
252                     match cl_msg {
253                        Ok(msg) => {
254                            if msg.is_text() && msg.to_str().unwrap() == ENTERED {
255                                Self::send_buffered(&mut sender, &buffer).await;
256                                let started = String::from("{\"type\": \"start_request\"}"); 
257                                subscription_sender.send(started).await.unwrap();
258                            } else if msg.is_text() && msg.to_str().unwrap() == BATCH_BEGIN  {
259                                if ! do_buffer {
260                                    Self::clear_buffer(&buffer);
261                                    do_buffer = true;
262                                }
263                            } else if msg.is_text() && msg.to_str().unwrap() == BATCH_END  {
264                                if do_buffer {
265                                    Self::send_buffered(&mut sender, &buffer).await;
266                                    do_buffer = false;
267                                }
268                            } else if msg.is_text() && do_buffer {
269                                write_to_buffer(msg, &buffer);        
270                            } else if msg.is_text() || is_gui { // binary data is not sent to extension
271                                sender.send(msg).await.unwrap_or_else(|e| eprintln!("Cannot send msg: {e}"));
272                            }
273                        },
274                        Err(e) => {
275                            eprintln!("error reading message from element: {e}");
276                        }
277                    };   
278                },   
279            }
280        }
281        exit_tx.send(true).await.expect("Error in exit");
282    }
283
284    /// Run 
285    pub async fn run<F, Fut>(&self, on_start: F) -> Option<tokio::task::JoinHandle<()>>
286    where   F: FnOnce(u16) -> Fut + Send + 'static,
287            Fut: Future<Output = bool> + Send + 'static {
288
289        let fm = self.filemap.clone();
290        
291        // Sigh there is not compile time warning while writing, this
292        // but this is quite fragile, bad things happens if
293        // name is not in fm - should be rewritten
294        // how to add keys to paths?
295        let  get_routes = 
296        warp::get()
297        .and(warp::path::tail()
298        .map(move |path: warp::path::Tail|  {
299            let name = path.as_str();
300            let file_map = fm.lock().unwrap();
301            assert!(file_map.contains_key(name), "Request not found: {name:#?}");
302
303            let mime = Self::file_to_mime(name).unwrap_or("octect-stream");
304            
305            warp::reply::with_header(file_map[name].clone(), "content-type", mime)
306            
307        }));
308
309        let (exit_tx, mut exit_rx) = MPSC::channel(32);
310
311
312        let buffer = self.buffer.clone();
313        let client_tx = self.client_tx.clone();
314        let subscription_sender = self.subscription_sender.clone();
315        
316        let ui_route = warp::ws()
317        .and(warp::path("gemgui"))
318        .and(warp::path::param())
319        .map( move |ws: Ws, name: String| {
320            let buffer = buffer.clone();
321            let client_tx = client_tx.clone();
322            let subscription_sender = subscription_sender.clone();
323            let exit_tx = exit_tx.clone();
324            let is_gui = name != "extension";
325            ws.on_upgrade( move |websocket: WebSocket| {
326                Self::handle_ws_client(websocket, client_tx, buffer, subscription_sender, exit_tx, is_gui)
327            })
328        });
329
330          let all_routes = ui_route
331        .or(get_routes);
332     
333        let (_, fut) = warp::serve(all_routes)
334            .bind_with_graceful_shutdown(([127, 0, 0, 1], self.port),  async move {
335                tokio::select! {
336                    Some(_) = exit_rx.recv() => {}
337                }
338            });
339
340        let fut_srv = tokio::spawn(fut);
341
342        // Start browser Ui after server is spawned
343        if ! on_start(self.port).await {
344            eprintln!("Start failed, exit");
345            return None; // early end
346        }
347        
348        Some(fut_srv)
349
350    }
351
352    fn file_to_mime(filename: &str) -> Option<&str>{
353        let ext = get_extension_from_filename(filename)?;
354        let ext = ext.to_ascii_lowercase();
355        let ext = ext.as_str();
356
357        static MAP: phf::Map<&'static str, &'static str> = phf::phf_map! {
358            "html" => "text/html;charset=utf-8",
359            "css" => "text/css;charset=utf-8",
360            "js" => "text/javascript;charset=utf-8",
361            "txt" => "text/txt;charset=utf-8",
362            "ico" => "image/x-icon",
363            "png" => "image/png",
364            "jpg" => "image/jpeg",
365            "gif" => "image/gif",
366            "svg" => "image/svg+xml"
367        };
368
369        match MAP.get(ext) {
370            Some(v) => Some(*v),
371            None => None,
372        }
373    }
374
375} 
376
377
378