makepad_platform/
web_socket.rs1#[allow(unused_imports)]
2use crate::{
3 os::OsWebSocket,
4 cx_api::*,
5 Cx,
6 studio::{AppToStudio,AppToStudioVec},
7 event::{HttpMethod,HttpRequest},
8 makepad_micro_serde::*
9};
10#[allow(unused_imports)]
11use std::{
12 time::{Instant, Duration},
13 collections::HashMap,
14 cell::RefCell,
15 sync::{
16 Mutex,
17 atomic::{AtomicU64, AtomicBool, Ordering},
18 mpsc::{channel, Sender, Receiver, RecvTimeoutError, TryRecvError,RecvError}
19 }
20};
21
22#[derive(Debug)]
23pub enum WebSocketThreadMsg{
24 Open{
25 socket_id: u64,
26 request:HttpRequest,
27 rx_sender: Sender<WebSocketMessage>
28 },
29 Close{
30 socket_id: u64
31 },
32 SendMessage{
33 socket_id: u64,
34 message: WebSocketMessage
35 },
36 AppToStudio{
37 message: AppToStudio
38 },
39 Terminate
40}
41
42pub struct WebSocket{
43 socket_id: u64,
44 pub rx_receiver: Receiver<WebSocketMessage>,
45}
46
47#[derive(Debug)]
48pub enum WebSocketMessage{
49 Error(String),
50 Binary(Vec<u8>),
51 String(String),
52 Opened,
53 Closed
54}
55
56pub (crate) static WEB_SOCKET_THREAD_SENDER: Mutex<Option<Sender<WebSocketThreadMsg>>> = Mutex::new(None);
57pub (crate) static WEB_SOCKET_ID: AtomicU64 = AtomicU64::new(0);
58pub (crate) static HAS_STUDIO_WEB_SOCKET: AtomicBool = AtomicBool::new(false);
59
60impl Drop for WebSocket{
61 fn drop(&mut self){
62 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
63 if let Some(sender) = &*sender{
64 sender.send(WebSocketThreadMsg::Close{
65 socket_id: self.socket_id,
66 }).unwrap();
67 }
68 }
69}
70impl Cx{
71 pub fn has_studio_web_socket()->bool{
72 HAS_STUDIO_WEB_SOCKET.load(Ordering::SeqCst)
73 }
74
75 #[cfg(target_arch = "wasm32")]
76 fn run_websocket_thread(&mut self){
77 let (rx_sender, rx_receiver) = channel();
78 let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
79 *thread_sender = Some(rx_sender);
80 let sockets = Mutex::new(RefCell::new(HashMap::new()));
81 self.spawn_timer_thread(16, move ||{
82 let mut app_to_studio = AppToStudioVec(Vec::new());
83 while let Ok(msg) = rx_receiver.try_recv(){
84 match msg{
85 WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
86 let socket = OsWebSocket::open(socket_id, request, rx_sender);
87 sockets.lock().unwrap().borrow_mut().insert(socket_id, socket);
88 }
89 WebSocketThreadMsg::SendMessage{socket_id, message}=>{
90 if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&socket_id){
91 socket.send_message(message).unwrap();
92 }
93 }
94 WebSocketThreadMsg::AppToStudio{message}=>{
95 app_to_studio.0.push(message);
96
97 }
98 WebSocketThreadMsg::Close{socket_id}=>{
99 sockets.lock().unwrap().borrow_mut().remove(&socket_id);
100 }
101 WebSocketThreadMsg::Terminate{}=>{
102 for socket in sockets.lock().unwrap().borrow_mut().values_mut(){
103 socket.close();
104 }
105 sockets.lock().unwrap().borrow_mut().clear();
106 }
107
108 }
109 }
110 if app_to_studio.0.len()>0{
111 if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&0){
112 socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).unwrap()
113 }
114 }
115 });
116 }
117
118 #[cfg(not(target_arch = "wasm32"))]
119 fn run_websocket_thread(&mut self){
120 let (rx_sender, rx_receiver) = channel();
122 let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
123 *thread_sender = Some(rx_sender);
124 self.spawn_thread(move ||{
125 let mut sockets = HashMap::new();
127 let mut app_to_studio = AppToStudioVec(Vec::new());
128 let mut first_message = None;
129 let collect_time = Duration::from_millis(16);
130 let mut cycle_time = Duration::MAX;
131 loop{
132 match rx_receiver.recv_timeout(cycle_time){
135 Ok(msg)=>match msg{
136 WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
137 let socket = OsWebSocket::open(socket_id, request, rx_sender);
138 sockets.insert(socket_id, socket);
139 }
140 WebSocketThreadMsg::SendMessage{socket_id, message}=>{
141 if let Some(socket) = sockets.get_mut(&socket_id){
142 socket.send_message(message).unwrap();
143 }
144 }
145 WebSocketThreadMsg::AppToStudio{message}=>{
146 if first_message.is_none(){
147 first_message = Some(Instant::now())
148 }
149 app_to_studio.0.push(message);
150 cycle_time = collect_time; }
152 WebSocketThreadMsg::Close{socket_id}=>{
153 if let Some(socket) = sockets.get_mut(&socket_id){
154 socket.close();
155 }
156 sockets.remove(&socket_id);
157 }
158 WebSocketThreadMsg::Terminate=>{
159 for socket in sockets.values_mut(){
160 socket.close();
161 }
162 *WEB_SOCKET_THREAD_SENDER.lock().unwrap() = None;
163 return;
164 }
165 },
166 Err(RecvTimeoutError::Timeout)=>{
167 }
168 Err(RecvTimeoutError::Disconnected)=>{
169 return
170 }
171 }
172 if let Some(first_time) = first_message{
173 if Instant::now().duration_since(first_time) >= collect_time{
174 if let Some(socket) = sockets.get_mut(&0){
176 socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).unwrap();
177 }
178 app_to_studio.0.clear();
179 first_message = None;
180 cycle_time = Duration::MAX;
181 }
182 }
183 }
184 });
185 }
186
187 fn start_studio_websocket(&mut self, studio_http: &str) {
188 if studio_http.len() == 0{
189 return
190 }
191 self.studio_http = studio_http.into();
192
193 #[cfg(all(not(target_os="tvos"), not(target_os="ios")))]{
194 HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
196 let request = HttpRequest::new(studio_http.to_string(), HttpMethod::GET);
197 self.studio_web_socket = Some(WebSocket::open(request));
198 }
199
200 }
201
202 pub fn stop_studio_websocket(&mut self){
203 self.studio_web_socket = None;
204 HAS_STUDIO_WEB_SOCKET.store(false, Ordering::SeqCst);
205 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
206 if let Some(sender) = &*sender{
207 sender.send(WebSocketThreadMsg::Terminate).unwrap();
208 }
209 }
210
211
212 #[cfg(any(target_os="tvos", target_os="ios"))]
213 pub fn start_studio_websocket_delayed(&mut self) {
214 HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
215 let request = HttpRequest::new(self.studio_http.clone(), HttpMethod::GET);
216 self.studio_web_socket = Some(WebSocket::open(request));
217 }
218
219 pub fn init_websockets(&mut self, studio_http: &str) {
220 self.run_websocket_thread();
221 self.start_studio_websocket(studio_http);
222 }
223
224 pub fn send_studio_message(msg:AppToStudio){
225 if !Cx::has_studio_web_socket(){
226 return
227 }
228 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
229 if let Some(sender) = &*sender{
230 let _= sender.send(WebSocketThreadMsg::AppToStudio{message:msg});
231 }
232 else{
233 println!("Web socket thread not running (yet) for {:?}", msg);
234 }
235 }
236}
237
238impl WebSocket{
239
240
241 pub fn open(request:HttpRequest)->WebSocket {
242 let (rx_sender, rx_receiver) = channel();
243 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
244 let socket_id = WEB_SOCKET_ID.fetch_add(1, Ordering::SeqCst);
245 if let Some(sender) = &*sender{
246 sender.send(WebSocketThreadMsg::Open{
247 socket_id,
248 rx_sender,
249 request
250 }).unwrap();
251 }
252 else{
253 panic!("Web socket thread not running")
254 }
255 WebSocket{
256 socket_id,
257 rx_receiver,
258 }
259 }
260
261 pub fn send_binary(&mut self, data:Vec<u8>)->Result<(),()>{
262 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
263 if let Some(sender) = &*sender{
264 sender.send(WebSocketThreadMsg::SendMessage{
265 socket_id: self.socket_id,
266 message: WebSocketMessage::Binary(data),
267 }).map_err(|_|())
268 }
269 else{
270 panic!("Web socket thread not running")
271 }
272 }
273
274 pub fn send_string(&mut self, data:String)->Result<(),()>{
275 let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
276 if let Some(sender) = &*sender{
277 sender.send(WebSocketThreadMsg::SendMessage{
278 socket_id: self.socket_id,
279 message: WebSocketMessage::String(data),
280 }).map_err(|_|())
281 }
282 else{
283 panic!("Web socket thread not running")
284 }
285 }
286
287 pub fn try_recv(&mut self)->Result<WebSocketMessage,TryRecvError>{
288 self.rx_receiver.try_recv()
289 }
290
291 pub fn recv(&mut self)->Result<WebSocketMessage,RecvError>{
292 self.rx_receiver.recv()
293 }
294
295}