#[allow(unused_imports)]
use crate::{
os::OsWebSocket,
cx_api::*,
Cx,
studio::{AppToStudio,AppToStudioVec},
event::{HttpMethod,HttpRequest},
makepad_micro_serde::*
};
#[allow(unused_imports)]
use std::{
time::{Instant, Duration},
collections::HashMap,
cell::RefCell,
sync::{
Mutex,
atomic::{AtomicU64, AtomicBool, Ordering},
mpsc::{channel, Sender, Receiver, RecvTimeoutError, TryRecvError,RecvError}
}
};
#[derive(Debug)]
pub enum WebSocketThreadMsg{
Open{
socket_id: u64,
request:HttpRequest,
rx_sender: Sender<WebSocketMessage>
},
Close{
socket_id: u64
},
SendMessage{
socket_id: u64,
message: WebSocketMessage
},
AppToStudio{
message: AppToStudio
},
Terminate
}
pub struct WebSocket{
socket_id: u64,
pub rx_receiver: Receiver<WebSocketMessage>,
}
#[derive(Debug)]
pub enum WebSocketMessage{
Error(String),
Binary(Vec<u8>),
String(String),
Opened,
Closed
}
pub (crate) static WEB_SOCKET_THREAD_SENDER: Mutex<Option<Sender<WebSocketThreadMsg>>> = Mutex::new(None);
pub (crate) static WEB_SOCKET_ID: AtomicU64 = AtomicU64::new(0);
pub (crate) static HAS_STUDIO_WEB_SOCKET: AtomicBool = AtomicBool::new(false);
impl Drop for WebSocket{
fn drop(&mut self){
let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
if let Some(sender) = &*sender{
sender.send(WebSocketThreadMsg::Close{
socket_id: self.socket_id,
}).unwrap();
}
}
}
impl Cx{
pub fn has_studio_web_socket()->bool{
HAS_STUDIO_WEB_SOCKET.load(Ordering::SeqCst)
}
#[cfg(target_arch = "wasm32")]
fn run_websocket_thread(&mut self){
let (rx_sender, rx_receiver) = channel();
let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
*thread_sender = Some(rx_sender);
let sockets = Mutex::new(RefCell::new(HashMap::new()));
self.spawn_timer_thread(16, move ||{
let mut app_to_studio = AppToStudioVec(Vec::new());
while let Ok(msg) = rx_receiver.try_recv(){
match msg{
WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
let socket = OsWebSocket::open(socket_id, request, rx_sender);
sockets.lock().unwrap().borrow_mut().insert(socket_id, socket);
}
WebSocketThreadMsg::SendMessage{socket_id, message}=>{
if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&socket_id){
socket.send_message(message).unwrap();
}
}
WebSocketThreadMsg::AppToStudio{message}=>{
app_to_studio.0.push(message);
}
WebSocketThreadMsg::Close{socket_id}=>{
sockets.lock().unwrap().borrow_mut().remove(&socket_id);
}
WebSocketThreadMsg::Terminate{}=>{
for socket in sockets.lock().unwrap().borrow_mut().values_mut(){
socket.close();
}
sockets.lock().unwrap().borrow_mut().clear();
}
}
}
if app_to_studio.0.len()>0{
if let Some(socket) = sockets.lock().unwrap().borrow_mut().get_mut(&0){
socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).unwrap()
}
}
});
}
#[cfg(not(target_arch = "wasm32"))]
fn run_websocket_thread(&mut self){
let (rx_sender, rx_receiver) = channel();
let mut thread_sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
*thread_sender = Some(rx_sender);
self.spawn_thread(move ||{
let mut sockets = HashMap::new();
let mut app_to_studio = AppToStudioVec(Vec::new());
let mut first_message = None;
let collect_time = Duration::from_millis(16);
let mut cycle_time = Duration::MAX;
loop{
match rx_receiver.recv_timeout(cycle_time){
Ok(msg)=>match msg{
WebSocketThreadMsg::Open{socket_id, request, rx_sender}=>{
let socket = OsWebSocket::open(socket_id, request, rx_sender);
sockets.insert(socket_id, socket);
}
WebSocketThreadMsg::SendMessage{socket_id, message}=>{
if let Some(socket) = sockets.get_mut(&socket_id){
if socket.send_message(message).is_err(){
crate::log!("Websocket sender closed unexpectedly");
return
}
}
}
WebSocketThreadMsg::AppToStudio{message}=>{
if first_message.is_none(){
first_message = Some(Instant::now())
}
app_to_studio.0.push(message);
cycle_time = collect_time; }
WebSocketThreadMsg::Close{socket_id}=>{
if let Some(socket) = sockets.get_mut(&socket_id){
socket.close();
}
sockets.remove(&socket_id);
}
WebSocketThreadMsg::Terminate=>{
for socket in sockets.values_mut(){
socket.close();
}
*WEB_SOCKET_THREAD_SENDER.lock().unwrap() = None;
return;
}
},
Err(RecvTimeoutError::Timeout)=>{
}
Err(RecvTimeoutError::Disconnected)=>{
return
}
}
if let Some(first_time) = first_message{
if Instant::now().duration_since(first_time) >= collect_time{
if let Some(socket) = sockets.get_mut(&0){
if socket.send_message(WebSocketMessage::Binary(app_to_studio.serialize_bin())).is_err(){
println!("Studio websocket disconnected!");
break;
};
}
app_to_studio.0.clear();
first_message = None;
cycle_time = Duration::MAX;
}
}
}
});
}
fn start_studio_websocket(&mut self, studio_http: &str) {
if studio_http.len() == 0{
return
}
self.studio_http = studio_http.into();
#[cfg(all(not(target_os="tvos"), not(target_os="ios")))]{
HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
let request = HttpRequest::new(studio_http.to_string(), HttpMethod::GET);
self.studio_web_socket = Some(WebSocket::open(request));
}
}
pub fn stop_studio_websocket(&mut self){
self.studio_web_socket = None;
HAS_STUDIO_WEB_SOCKET.store(false, Ordering::SeqCst);
let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
if let Some(sender) = &*sender{
sender.send(WebSocketThreadMsg::Terminate).unwrap();
}
}
#[cfg(any(target_os="tvos", target_os="ios"))]
pub fn start_studio_websocket_delayed(&mut self) {
HAS_STUDIO_WEB_SOCKET.store(true, Ordering::SeqCst);
let request = HttpRequest::new(self.studio_http.clone(), HttpMethod::GET);
self.studio_web_socket = Some(WebSocket::open(request));
}
pub fn init_websockets(&mut self, studio_http: &str) {
self.run_websocket_thread();
self.start_studio_websocket(studio_http);
}
pub fn send_studio_message(msg:AppToStudio){
if !Cx::has_studio_web_socket(){
return
}
let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
if let Some(sender) = &*sender{
let _= sender.send(WebSocketThreadMsg::AppToStudio{message:msg});
}
else{
println!("Web socket thread not running (yet) for {:?}", msg);
}
}
}
impl WebSocket{
pub fn open(request:HttpRequest)->WebSocket {
let (rx_sender, rx_receiver) = channel();
let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
let socket_id = WEB_SOCKET_ID.fetch_add(1, Ordering::SeqCst);
if let Some(sender) = &*sender{
sender.send(WebSocketThreadMsg::Open{
socket_id,
rx_sender,
request
}).unwrap();
}
else{
panic!("Web socket thread not running")
}
WebSocket{
socket_id,
rx_receiver,
}
}
pub fn send_binary(&mut self, data:Vec<u8>)->Result<(),()>{
let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
if let Some(sender) = &*sender{
sender.send(WebSocketThreadMsg::SendMessage{
socket_id: self.socket_id,
message: WebSocketMessage::Binary(data),
}).map_err(|_|())
}
else{
panic!("Web socket thread not running")
}
}
pub fn send_string(&mut self, data:String)->Result<(),()>{
let sender = WEB_SOCKET_THREAD_SENDER.lock().unwrap();
if let Some(sender) = &*sender{
sender.send(WebSocketThreadMsg::SendMessage{
socket_id: self.socket_id,
message: WebSocketMessage::String(data),
}).map_err(|_|())
}
else{
panic!("Web socket thread not running")
}
}
pub fn try_recv(&mut self)->Result<WebSocketMessage,TryRecvError>{
self.rx_receiver.try_recv()
}
pub fn recv(&mut self)->Result<WebSocketMessage,RecvError>{
self.rx_receiver.recv()
}
}