makepad_platform/
web_socket.rs

1#[allow(unused_imports)]
2use crate::{
3    os::OsWebSocket,
4    cx_api::*,
5    Cx,
6    studio::{AppToStudio,AppToStudioVec},
7    event::{HttpMethod,HttpRequest},
8    makepad_micro_serde::*
9};
10#[allow(unused_imports)]
11use std::{
12    time::{Instant, Duration},
13    collections::HashMap,
14    cell::RefCell,
15    sync::{
16        Mutex,
17        atomic::{AtomicU64, AtomicBool, Ordering},
18        mpsc::{channel, Sender, Receiver, RecvTimeoutError, TryRecvError,RecvError}
19    }
20};
21     
22#[derive(Debug)]
23pub enum WebSocketThreadMsg{
24    Open{
25        socket_id: u64,
26        request:HttpRequest,
27        rx_sender: Sender<WebSocketMessage>
28    },
29    Close{
30        socket_id: u64
31    },
32    SendMessage{
33        socket_id: u64,
34        message: WebSocketMessage
35    },
36    AppToStudio{
37        message: AppToStudio
38    },
39    Terminate
40}
41
42pub struct WebSocket{
43    socket_id: u64,
44    pub rx_receiver: Receiver<WebSocketMessage>,
45}
46
47#[derive(Debug)]
48pub enum WebSocketMessage{
49    Error(String),
50    Binary(Vec<u8>),
51    String(String),
52    Opened,
53    Closed
54}
55
56pub (crate) static WEB_SOCKET_THREAD_SENDER: Mutex<Option<Sender<WebSocketThreadMsg>>> = Mutex::new(None);
57pub (crate) static WEB_SOCKET_ID: AtomicU64 = AtomicU64::new(0);
58pub (crate) static HAS_STUDIO_WEB_SOCKET: AtomicBool = AtomicBool::new(false);
59
60impl Drop for WebSocket{
61    fn drop(&mut self){
62        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
63        if let Some(sender) = &*sender{
64            sender.send(WebSocketThreadMsg::Close{
65                socket_id: self.socket_id,
66            }).unwrap();
67        }
68    }
69}
70impl Cx{
71    pub fn has_studio_web_socket()->bool{ 
72       HAS_STUDIO_WEB_SOCKET.load(Ordering::SeqCst)
73    }
74    
75    #[cfg(target_arch = "wasm32")]
76    fn run_websocket_thread(&mut self){
77        let (rx_sender, rx_receiver) = channel();
78        let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
79        *thread_sender = Some(rx_sender);
80        let sockets = Mutex::new(RefCell::new(HashMap::new()));
81        self.spawn_timer_thread(16, move ||{ 
82            let mut app_to_studio = AppToStudioVec(Vec::new());
83            while let Ok(msg) = rx_receiver.try_recv(){
84                match msg{
85                    WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
86                        let socket = OsWebSocket::open(socket_id, request, rx_sender);
87                        sockets.lock().unwrap().borrow_mut().insert(socket_id, socket);
88                    }
89                    WebSocketThreadMsg::SendMessage{socket_id, message}=>{
90                        if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&socket_id){
91                            socket.send_message(message).unwrap();
92                        }
93                    }
94                    WebSocketThreadMsg::AppToStudio{message}=>{
95                        app_to_studio.0.push(message);
96
97                    }
98                    WebSocketThreadMsg::Close{socket_id}=>{
99                        sockets.lock().unwrap().borrow_mut().remove(&socket_id);
100                    }
101                    WebSocketThreadMsg::Terminate{}=>{
102                        for socket in sockets.lock().unwrap().borrow_mut().values_mut(){
103                            socket.close();
104                        }
105                        sockets.lock().unwrap().borrow_mut().clear();
106                    }
107                                        
108                }
109            }
110            if app_to_studio.0.len()>0{
111                if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&0){
112                    socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).unwrap()
113                }
114            }
115        });
116    }
117        
118    #[cfg(not(target_arch = "wasm32"))]
119    fn run_websocket_thread(&mut self){
120        // lets create a thread
121        let (rx_sender, rx_receiver) = channel();
122        let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
123        *thread_sender = Some(rx_sender);
124        self.spawn_thread(move ||{
125            // this is the websocket thread.
126            let mut sockets = HashMap::new();
127            let mut app_to_studio = AppToStudioVec(Vec::new());
128            let mut first_message = None;
129            let collect_time = Duration::from_millis(16);
130            let mut cycle_time = Duration::MAX;
131            loop{
132                // the idea is that this loop collects AppToStudio messages for a minimum of collect_time 
133                // and then batches it. this solves flooding underlying platform websocket overhead (esp on web)
134                match rx_receiver.recv_timeout(cycle_time){
135                    Ok(msg)=>match msg{
136                        WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
137                            let socket = OsWebSocket::open(socket_id, request, rx_sender);
138                            sockets.insert(socket_id, socket);
139                        }
140                        WebSocketThreadMsg::SendMessage{socket_id, message}=>{
141                            if let Some(socket) = sockets.get_mut(&socket_id){
142                                socket.send_message(message).unwrap();
143                            }
144                        }
145                        WebSocketThreadMsg::AppToStudio{message}=>{
146                            if first_message.is_none(){
147                                first_message = Some(Instant::now())
148                            }
149                            app_to_studio.0.push(message);
150                            cycle_time = collect_time; // we should now block with a max of collect time since we received the first message
151                        }
152                        WebSocketThreadMsg::Close{socket_id}=>{
153                            if let Some(socket) = sockets.get_mut(&socket_id){
154                                socket.close();
155                            }
156                            sockets.remove(&socket_id);
157                        }
158                        WebSocketThreadMsg::Terminate=>{
159                            for socket in sockets.values_mut(){
160                                socket.close();
161                            }
162                            *WEB_SOCKET_THREAD_SENDER.lock().unwrap() = None;
163                            return;
164                        }
165                    },
166                    Err(RecvTimeoutError::Timeout)=>{ 
167                    }
168                    Err(RecvTimeoutError::Disconnected)=>{
169                        return
170                    }
171                }
172                if let Some(first_time) = first_message{
173                    if Instant::now().duration_since(first_time) >= collect_time{
174                        // lets send it
175                        if let Some(socket) = sockets.get_mut(&0){
176                            socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).unwrap();
177                        }
178                        app_to_studio.0.clear();
179                        first_message = None;
180                        cycle_time = Duration::MAX;
181                    }
182                }
183            }
184        });
185    }
186    
187    fn start_studio_websocket(&mut self, studio_http: &str) {
188        if studio_http.len() == 0{
189            return
190        }
191        self.studio_http = studio_http.into();
192        
193        #[cfg(all(not(target_os="tvos"), not(target_os="ios")))]{
194            // lets open a websocket
195            HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
196            let request = HttpRequest::new(studio_http.to_string(), HttpMethod::GET);
197            self.studio_web_socket = Some(WebSocket::open(request));
198        }
199        
200    }
201    
202    pub fn stop_studio_websocket(&mut self){
203        self.studio_web_socket = None;
204        HAS_STUDIO_WEB_SOCKET.store(false, Ordering::SeqCst);
205        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
206        if let Some(sender) = &*sender{
207            sender.send(WebSocketThreadMsg::Terminate).unwrap();
208        }
209    }
210    
211    
212    #[cfg(any(target_os="tvos", target_os="ios"))]
213    pub fn start_studio_websocket_delayed(&mut self) {
214        HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
215        let request = HttpRequest::new(self.studio_http.clone(), HttpMethod::GET);
216        self.studio_web_socket = Some(WebSocket::open(request));
217    }
218     
219    pub fn init_websockets(&mut self, studio_http: &str) {
220        self.run_websocket_thread();
221        self.start_studio_websocket(studio_http);
222    }
223    
224    pub fn send_studio_message(msg:AppToStudio){
225        if !Cx::has_studio_web_socket(){
226            return
227        }
228        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
229        if let Some(sender) = &*sender{
230            let _= sender.send(WebSocketThreadMsg::AppToStudio{message:msg});
231        }
232        else{
233            println!("Web socket thread not running (yet) for {:?}", msg);
234        }
235    }
236}
237
238impl WebSocket{    
239    
240    
241    pub fn open(request:HttpRequest)->WebSocket {
242        let (rx_sender, rx_receiver) = channel();
243        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
244        let socket_id = WEB_SOCKET_ID.fetch_add(1, Ordering::SeqCst);
245        if let Some(sender) = &*sender{
246            sender.send(WebSocketThreadMsg::Open{
247                socket_id,
248                rx_sender,
249                request
250            }).unwrap();
251            }
252        else{
253            panic!("Web socket thread not running")
254        }
255        WebSocket{
256            socket_id,
257            rx_receiver,
258        }
259    }
260    
261    pub fn send_binary(&mut self, data:Vec<u8>)->Result<(),()>{
262        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
263        if let Some(sender) = &*sender{
264            sender.send(WebSocketThreadMsg::SendMessage{
265                socket_id: self.socket_id,
266                message: WebSocketMessage::Binary(data),
267            }).map_err(|_|())
268        }
269        else{
270            panic!("Web socket thread not running")
271        }
272    }
273    
274    pub fn send_string(&mut self, data:String)->Result<(),()>{
275        let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
276        if let Some(sender) = &*sender{
277            sender.send(WebSocketThreadMsg::SendMessage{
278                socket_id: self.socket_id,
279                message: WebSocketMessage::String(data),
280            }).map_err(|_|())
281        }
282        else{
283            panic!("Web socket thread not running")
284        }
285    }
286    
287    pub fn try_recv(&mut self)->Result<WebSocketMessage,TryRecvError>{
288        self.rx_receiver.try_recv()
289    }
290    
291    pub fn recv(&mut self)->Result<WebSocketMessage,RecvError>{
292        self.rx_receiver.recv()
293    }
294    
295}