makepad_platform/
web_socket.rs1#[allow(unused_imports)]
2use crate::{
3 os::OsWebSocket,
4 cx_api::*,
5 Cx,
6 studio::{AppToStudio,AppToStudioVec},
7 event::{HttpMethod,HttpRequest},
8 makepad_micro_serde::*
9};
10#[allow(unused_imports)]
11use std::{
12 time::{Instant, Duration},
13 collections::HashMap,
14 cell::RefCell,
15 sync::{
16 Mutex,
17 atomic::{AtomicU64, AtomicBool, Ordering},
18 mpsc::{channel, Sender, Receiver, RecvTimeoutError, TryRecvError,RecvError}
19 }
20};
21
22#[derive(Debug)]
23pub enum WebSocketThreadMsg{
24 Open{
25 socket_id: u64,
26 request:HttpRequest,
27 rx_sender: Sender<WebSocketMessage>
28 },
29 Close{
30 socket_id: u64
31 },
32 SendMessage{
33 socket_id: u64,
34 message: WebSocketMessage
35 },
36 AppToStudio{
37 message: AppToStudio
38 },
39 Terminate
40}
41
42pub struct WebSocket{
43 socket_id: u64,
44 pub rx_receiver: Receiver<WebSocketMessage>,
45}
46
47#[derive(Debug)]
48pub enum WebSocketMessage{
49 Error(String),
50 Binary(Vec<u8>),
51 String(String),
52 Opened,
53 Closed
54}
55
56pub (crate) static WEB_SOCKET_THREAD_SENDER: Mutex<Option<Sender<WebSocketThreadMsg>>> = Mutex::new(None);
57pub (crate) static WEB_SOCKET_ID: AtomicU64 = AtomicU64::new(0);
58pub (crate) static HAS_STUDIO_WEB_SOCKET: AtomicBool = AtomicBool::new(false);
59
60impl Drop for WebSocket{
61 fn drop(&mut self){
62 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
63 if let Some(sender) = &*sender{
64 sender.send(WebSocketThreadMsg::Close{
65 socket_id: self.socket_id,
66 }).unwrap();
67 }
68 }
69}
70impl Cx{
71 pub fn has_studio_web_socket()->bool{
72 HAS_STUDIO_WEB_SOCKET.load(Ordering::SeqCst)
73 }
74
75 #[cfg(target_arch = "wasm32")]
76 fn run_websocket_thread(&mut self){
77 let (rx_sender, rx_receiver) = channel();
78 let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
79 *thread_sender = Some(rx_sender);
80 let sockets = Mutex::new(RefCell::new(HashMap::new()));
81 self.spawn_timer_thread(16, move ||{
82 let mut app_to_studio = AppToStudioVec(Vec::new());
83 while let Ok(msg) = rx_receiver.try_recv(){
84 match msg{
85 WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
86 let socket = OsWebSocket::open(socket_id, request, rx_sender);
87 sockets.lock().unwrap().borrow_mut().insert(socket_id, socket);
88 }
89 WebSocketThreadMsg::SendMessage{socket_id, message}=>{
90 if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&socket_id){
91 socket.send_message(message).unwrap();
92 }
93 }
94 WebSocketThreadMsg::AppToStudio{message}=>{
95 app_to_studio.0.push(message);
96
97 }
98 WebSocketThreadMsg::Close{socket_id}=>{
99 sockets.lock().unwrap().borrow_mut().remove(&socket_id);
100 }
101 WebSocketThreadMsg::Terminate{}=>{
102 for socket in sockets.lock().unwrap().borrow_mut().values_mut(){
103 socket.close();
104 }
105 sockets.lock().unwrap().borrow_mut().clear();
106 }
107
108 }
109 }
110 if app_to_studio.0.len()>0{
111 if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&0){
112 socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).unwrap()
113 }
114 }
115 });
116 }
117
118 #[cfg(not(target_arch = "wasm32"))]
119 fn run_websocket_thread(&mut self){
120 let (rx_sender, rx_receiver) = channel();
122 let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
123 *thread_sender = Some(rx_sender);
124 self.spawn_thread(move ||{
125 let mut sockets = HashMap::new();
127 let mut app_to_studio = AppToStudioVec(Vec::new());
128 let mut first_message = None;
129 let collect_time = Duration::from_millis(16);
130 let mut cycle_time = Duration::MAX;
131 loop{
132 match rx_receiver.recv_timeout(cycle_time){
135 Ok(msg)=>match msg{
136 WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
137 let socket = OsWebSocket::open(socket_id, request, rx_sender);
138 sockets.insert(socket_id, socket);
139 }
140 WebSocketThreadMsg::SendMessage{socket_id, message}=>{
141 if let Some(socket) = sockets.get_mut(&socket_id){
142 if socket.send_message(message).is_err(){
143 crate::log!("Websocket sender closed unexpectedly");
144 return
145 }
146 }
147 }
148 WebSocketThreadMsg::AppToStudio{message}=>{
149 if first_message.is_none(){
150 first_message = Some(Instant::now())
151 }
152 app_to_studio.0.push(message);
153 cycle_time = collect_time; }
155 WebSocketThreadMsg::Close{socket_id}=>{
156 if let Some(socket) = sockets.get_mut(&socket_id){
157 socket.close();
158 }
159 sockets.remove(&socket_id);
160 }
161 WebSocketThreadMsg::Terminate=>{
162 for socket in sockets.values_mut(){
163 socket.close();
164 }
165 *WEB_SOCKET_THREAD_SENDER.lock().unwrap() = None;
166 return;
167 }
168 },
169 Err(RecvTimeoutError::Timeout)=>{
170 }
171 Err(RecvTimeoutError::Disconnected)=>{
172 return
173 }
174 }
175 if let Some(first_time) = first_message{
176 if Instant::now().duration_since(first_time) >= collect_time{
177 if let Some(socket) = sockets.get_mut(&0){
179 if socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).is_err(){
180 println!("Studio websocket disconnected!");
181 break;
183 };
184 }
185 app_to_studio.0.clear();
186 first_message = None;
187 cycle_time = Duration::MAX;
188 }
189 }
190 }
191 });
192 }
193
194 fn start_studio_websocket(&mut self, studio_http: &str) {
195 if studio_http.len() == 0{
196 return
197 }
198 self.studio_http = studio_http.into();
199
200 #[cfg(all(not(target_os="tvos"), not(target_os="ios")))]{
201 HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
203 let request = HttpRequest::new(studio_http.to_string(), HttpMethod::GET);
204 self.studio_web_socket = Some(WebSocket::open(request));
205 }
206
207 }
208
209 pub fn stop_studio_websocket(&mut self){
210 self.studio_web_socket = None;
211 HAS_STUDIO_WEB_SOCKET.store(false, Ordering::SeqCst);
212 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
213 if let Some(sender) = &*sender{
214 sender.send(WebSocketThreadMsg::Terminate).unwrap();
215 }
216 }
217
218
219 #[cfg(any(target_os="tvos", target_os="ios"))]
220 pub fn start_studio_websocket_delayed(&mut self) {
221 HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
222 let request = HttpRequest::new(self.studio_http.clone(), HttpMethod::GET);
223 self.studio_web_socket = Some(WebSocket::open(request));
224 }
225
226 pub fn init_websockets(&mut self, studio_http: &str) {
227 self.run_websocket_thread();
228 self.start_studio_websocket(studio_http);
229 }
230
231 pub fn send_studio_message(msg:AppToStudio){
232 if !Cx::has_studio_web_socket(){
233 return
234 }
235 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
236 if let Some(sender) = &*sender{
237 let _= sender.send(WebSocketThreadMsg::AppToStudio{message:msg});
238 }
239 else{
240 println!("Web socket thread not running (yet) for {:?}", msg);
241 }
242 }
243}
244
245impl WebSocket{
246
247
248 pub fn open(request:HttpRequest)->WebSocket {
249 let (rx_sender, rx_receiver) = channel();
250 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
251 let socket_id = WEB_SOCKET_ID.fetch_add(1, Ordering::SeqCst);
252 if let Some(sender) = &*sender{
253 sender.send(WebSocketThreadMsg::Open{
254 socket_id,
255 rx_sender,
256 request
257 }).unwrap();
258 }
259 else{
260 panic!("Web socket thread not running")
261 }
262 WebSocket{
263 socket_id,
264 rx_receiver,
265 }
266 }
267
268 pub fn send_binary(&mut self, data:Vec<u8>)->Result<(),()>{
269 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
270 if let Some(sender) = &*sender{
271 sender.send(WebSocketThreadMsg::SendMessage{
272 socket_id: self.socket_id,
273 message: WebSocketMessage::Binary(data),
274 }).map_err(|_|())
275 }
276 else{
277 panic!("Web socket thread not running")
278 }
279 }
280
281 pub fn send_string(&mut self, data:String)->Result<(),()>{
282 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
283 if let Some(sender) = &*sender{
284 sender.send(WebSocketThreadMsg::SendMessage{
285 socket_id: self.socket_id,
286 message: WebSocketMessage::String(data),
287 }).map_err(|_|())
288 }
289 else{
290 panic!("Web socket thread not running")
291 }
292 }
293
294 pub fn try_recv(&mut self)->Result<WebSocketMessage,TryRecvError>{
295 self.rx_receiver.try_recv()
296 }
297
298 pub fn recv(&mut self)->Result<WebSocketMessage,RecvError>{
299 self.rx_receiver.recv()
300 }
301
302}