makepad_platform/
web_socket.rs

1#[allow(unused_imports)]
2use crate::{
3    os::OsWebSocket,
4    cx_api::*,
5    Cx,
6    studio::{AppToStudio,AppToStudioVec},
7    event::{HttpMethod,HttpRequest},
8    makepad_micro_serde::*
9};
10#[allow(unused_imports)]
11use std::{
12    time::{Instant, Duration},
13    collections::HashMap,
14    cell::RefCell,
15    sync::{
16        Mutex,
17        atomic::{AtomicU64, AtomicBool, Ordering},
18        mpsc::{channel, Sender, Receiver, RecvTimeoutError, TryRecvError,RecvError}
19    }
20};
21     
22#[derive(Debug)]
23pub enum WebSocketThreadMsg{
24    Open{
25        socket_id: u64,
26        request:HttpRequest,
27        rx_sender: Sender<WebSocketMessage>
28    },
29    Close{
30        socket_id: u64
31    },
32    SendMessage{
33        socket_id: u64,
34        message: WebSocketMessage
35    },
36    AppToStudio{
37        message: AppToStudio
38    },
39    Terminate
40}
41
42pub struct WebSocket{
43    socket_id: u64,
44    pub rx_receiver: Receiver<WebSocketMessage>,
45}
46
47#[derive(Debug)]
48pub enum WebSocketMessage{
49    Error(String),
50    Binary(Vec<u8>),
51    String(String),
52    Opened,
53    Closed
54}
55
56pub (crate) static WEB_SOCKET_THREAD_SENDER: Mutex<Option<Sender<WebSocketThreadMsg>>> = Mutex::new(None);
57pub (crate) static WEB_SOCKET_ID: AtomicU64 = AtomicU64::new(0);
58pub (crate) static HAS_STUDIO_WEB_SOCKET: AtomicBool = AtomicBool::new(false);
59
60impl Drop for WebSocket{
61    fn drop(&mut self){
62        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
63        if let Some(sender) = &*sender{
64            sender.send(WebSocketThreadMsg::Close{
65                socket_id: self.socket_id,
66            }).unwrap();
67        }
68    }
69}
70impl Cx{
71    pub fn has_studio_web_socket()->bool{ 
72       HAS_STUDIO_WEB_SOCKET.load(Ordering::SeqCst)
73    }
74    
75    #[cfg(target_arch = "wasm32")]
76    fn run_websocket_thread(&mut self){
77        let (rx_sender, rx_receiver) = channel();
78        let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
79        *thread_sender = Some(rx_sender);
80        let sockets = Mutex::new(RefCell::new(HashMap::new()));
81        self.spawn_timer_thread(16, move ||{ 
82            let mut app_to_studio = AppToStudioVec(Vec::new());
83            while let Ok(msg) = rx_receiver.try_recv(){
84                match msg{
85                    WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
86                        let socket = OsWebSocket::open(socket_id, request, rx_sender);
87                        sockets.lock().unwrap().borrow_mut().insert(socket_id, socket);
88                    }
89                    WebSocketThreadMsg::SendMessage{socket_id, message}=>{
90                        if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&socket_id){
91                            socket.send_message(message).unwrap();
92                        }
93                    }
94                    WebSocketThreadMsg::AppToStudio{message}=>{
95                        app_to_studio.0.push(message);
96
97                    }
98                    WebSocketThreadMsg::Close{socket_id}=>{
99                        sockets.lock().unwrap().borrow_mut().remove(&socket_id);
100                    }
101                    WebSocketThreadMsg::Terminate{}=>{
102                        for socket in sockets.lock().unwrap().borrow_mut().values_mut(){
103                            socket.close();
104                        }
105                        sockets.lock().unwrap().borrow_mut().clear();
106                    }
107                                        
108                }
109            }
110            if app_to_studio.0.len()>0{
111                if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&0){
112                    socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).unwrap()
113                }
114            }
115        });
116    }
117        
118    #[cfg(not(target_arch = "wasm32"))]
119    fn run_websocket_thread(&mut self){
120        // lets create a thread
121        let (rx_sender, rx_receiver) = channel();
122        let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
123        *thread_sender = Some(rx_sender);
124        self.spawn_thread(move ||{
125            // this is the websocket thread.
126            let mut sockets = HashMap::new();
127            let mut app_to_studio = AppToStudioVec(Vec::new());
128            let mut first_message = None;
129            let collect_time = Duration::from_millis(16);
130            let mut cycle_time = Duration::MAX;
131            loop{
132                // the idea is that this loop collects AppToStudio messages for a minimum of collect_time 
133                // and then batches it. this solves flooding underlying platform websocket overhead (esp on web)
134                match rx_receiver.recv_timeout(cycle_time){
135                    Ok(msg)=>match msg{
136                        WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
137                            let socket = OsWebSocket::open(socket_id, request, rx_sender);
138                            sockets.insert(socket_id, socket);
139                        }
140                        WebSocketThreadMsg::SendMessage{socket_id, message}=>{
141                            if let Some(socket) = sockets.get_mut(&socket_id){
142                                if socket.send_message(message).is_err(){
143                                    crate::log!("Websocket sender closed unexpectedly");
144                                    return
145                                }
146                            }
147                        }
148                        WebSocketThreadMsg::AppToStudio{message}=>{
149                            if first_message.is_none(){
150                                first_message = Some(Instant::now())
151                            }
152                            app_to_studio.0.push(message);
153                            cycle_time = collect_time; // we should now block with a max of collect time since we received the first message
154                        }
155                        WebSocketThreadMsg::Close{socket_id}=>{
156                            if let Some(socket) = sockets.get_mut(&socket_id){
157                                socket.close();
158                            }
159                            sockets.remove(&socket_id);
160                        }
161                        WebSocketThreadMsg::Terminate=>{
162                            for socket in sockets.values_mut(){
163                                socket.close();
164                            }
165                            *WEB_SOCKET_THREAD_SENDER.lock().unwrap() = None;
166                            return;
167                        }
168                    },
169                    Err(RecvTimeoutError::Timeout)=>{ 
170                    }
171                    Err(RecvTimeoutError::Disconnected)=>{
172                        return
173                    }
174                }
175                if let Some(first_time) = first_message{
176                    if Instant::now().duration_since(first_time) >= collect_time{
177                        // lets send it
178                        if let Some(socket) = sockets.get_mut(&0){
179                            if socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).is_err(){
180                                println!("Studio websocket disconnected!");
181                                // studio disconnected, just stop the threadloop
182                                break;
183                            };
184                        }
185                        app_to_studio.0.clear();
186                        first_message = None;
187                        cycle_time = Duration::MAX;
188                    }
189                }
190            }
191        });
192    }
193    
194    fn start_studio_websocket(&mut self, studio_http: &str) {
195        if studio_http.len() == 0{
196            return
197        }
198        self.studio_http = studio_http.into();
199        
200        #[cfg(all(not(target_os="tvos"), not(target_os="ios")))]{
201            // lets open a websocket
202            HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
203            let request = HttpRequest::new(studio_http.to_string(), HttpMethod::GET);
204            self.studio_web_socket = Some(WebSocket::open(request));
205        }
206        
207    }
208    
209    pub fn stop_studio_websocket(&mut self){
210        self.studio_web_socket = None;
211        HAS_STUDIO_WEB_SOCKET.store(false, Ordering::SeqCst);
212        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
213        if let Some(sender) = &*sender{
214            sender.send(WebSocketThreadMsg::Terminate).unwrap();
215        }
216    }
217    
218    
219    #[cfg(any(target_os="tvos", target_os="ios"))]
220    pub fn start_studio_websocket_delayed(&mut self) {
221        HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
222        let request = HttpRequest::new(self.studio_http.clone(), HttpMethod::GET);
223        self.studio_web_socket = Some(WebSocket::open(request));
224    }
225     
226    pub fn init_websockets(&mut self, studio_http: &str) {
227        self.run_websocket_thread();
228        self.start_studio_websocket(studio_http);
229    }
230    
231    pub fn send_studio_message(msg:AppToStudio){
232        if !Cx::has_studio_web_socket(){
233            return
234        }
235        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
236        if let Some(sender) = &*sender{
237            let _= sender.send(WebSocketThreadMsg::AppToStudio{message:msg});
238        }
239        else{
240            println!("Web socket thread not running (yet) for {:?}", msg);
241        }
242    }
243}
244
245impl WebSocket{    
246    
247    
248    pub fn open(request:HttpRequest)->WebSocket {
249        let (rx_sender, rx_receiver) = channel();
250        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
251        let socket_id = WEB_SOCKET_ID.fetch_add(1, Ordering::SeqCst);
252        if let Some(sender) = &*sender{
253            sender.send(WebSocketThreadMsg::Open{
254                socket_id,
255                rx_sender,
256                request
257            }).unwrap();
258            }
259        else{
260            panic!("Web socket thread not running")
261        }
262        WebSocket{
263            socket_id,
264            rx_receiver,
265        }
266    }
267    
268    pub fn send_binary(&mut self, data:Vec<u8>)->Result<(),()>{
269        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
270        if let Some(sender) = &*sender{
271            sender.send(WebSocketThreadMsg::SendMessage{
272                socket_id: self.socket_id,
273                message: WebSocketMessage::Binary(data),
274            }).map_err(|_|())
275        }
276        else{
277            panic!("Web socket thread not running")
278        }
279    }
280    
281    pub fn send_string(&mut self, data:String)->Result<(),()>{
282        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
283        if let Some(sender) = &*sender{
284            sender.send(WebSocketThreadMsg::SendMessage{
285                socket_id: self.socket_id,
286                message: WebSocketMessage::String(data),
287            }).map_err(|_|())
288        }
289        else{
290            panic!("Web socket thread not running")
291        }
292    }
293    
294    pub fn try_recv(&mut self)->Result<WebSocketMessage,TryRecvError>{
295        self.rx_receiver.try_recv()
296    }
297    
298    pub fn recv(&mut self)->Result<WebSocketMessage,RecvError>{
299        self.rx_receiver.recv()
300    }
301    
302}