makepad_platform/os/linux/
web_socket.rs1
2use crate::event::HttpRequest;
3use crate::web_socket::{WebSocketMessage};
4use std::sync::mpsc::{channel, Sender};
5use std::net::{TcpStream, Shutdown};
6use std::io::{Read};
7use makepad_http::utils::write_bytes_to_tcp_stream_no_error;
8use makepad_http::websocket::{ServerWebSocket, ServerWebSocketMessageFormat, ServerWebSocketMessageHeader, ServerWebSocketMessage, SERVER_WEB_SOCKET_PONG_MESSAGE};
9
10pub struct OsWebSocket{
11 sender: Option<Sender<WebSocketMessage>>,
12 stream: Option<TcpStream>,
13}
14
15impl Drop for OsWebSocket{
16 fn drop(&mut self){
17 self.sender.take();
18 if let Some(stream) = self.stream.take(){
19 stream.shutdown(Shutdown::Both).ok();
20 }
21 }
22}
23
24impl OsWebSocket{
25 pub fn send_message(&mut self, message:WebSocketMessage)->Result<(),()>{
26 if let Some(sender) = &mut self.sender{
28 if sender.send(message).is_err(){
29 return Err(());
30 }
31 return Ok(())
32 }
33 Err(())
34 }
35
36 pub fn close(&mut self){
37 }
38
39 pub fn open(_socket_id:u64, request: HttpRequest, rx_sender:Sender<WebSocketMessage>)->OsWebSocket{
40 let split = request.split_url();
42 let stream = TcpStream::connect(format!("{}:{}", split.host, split.port));
46 let mut http_request = format!("GET /{} HTTP/1.1\r\nHost: {}\r\nConnection: Upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 13\r\nSec-WebSocket-Key: SxJdXBRtW7Q4awLDhflO0Q==\r\n", split.file, split.host);
50 http_request.push_str(&request.get_headers_string());
51 http_request.push_str("\r\n");
52
53 if stream.is_err(){
55 rx_sender.send(WebSocketMessage::Error("Error connecting websocket tcpstream".into())).unwrap();
56 return OsWebSocket{sender:None, stream:None}
57 }
58 let mut stream = stream.unwrap();
59 if write_bytes_to_tcp_stream_no_error(&mut stream, http_request.as_bytes()){
60 rx_sender.send(WebSocketMessage::Error("Error writing request to websocket".into())).unwrap();
61 return OsWebSocket{sender:None, stream:None}
62 }
63
64 let mut input_stream = stream.try_clone().unwrap();
66 let mut output_stream = stream.try_clone().unwrap();
67 let (sender, receiver) = channel();
68
69 let _writer_thread = std::thread::spawn(move || {
70 while let Ok(msg) = receiver.recv(){
71 match msg{
72 WebSocketMessage::Binary(data)=>{
73 let header = ServerWebSocketMessageHeader::from_len(data.len(), ServerWebSocketMessageFormat::Binary, false);
74 if write_bytes_to_tcp_stream_no_error(&mut output_stream, header.as_slice()) ||
75 write_bytes_to_tcp_stream_no_error(&mut output_stream, &data){
76 break;
77 }
78 }
79 WebSocketMessage::String(data)=>{
80 let header = ServerWebSocketMessageHeader::from_len(data.len(), ServerWebSocketMessageFormat::Binary, false);
81 if write_bytes_to_tcp_stream_no_error(&mut output_stream, header.as_slice()) ||
82 write_bytes_to_tcp_stream_no_error(&mut output_stream, &data.as_bytes()){
83 break;
84 }
85 }
86 _=>{
87 crate::error!("WebSocketMessage of this type sending not implemented");
88 }
89 }
90 }
91 });
92
93 let _reader_thread = std::thread::spawn(move || {
94 let mut web_socket = ServerWebSocket::new();
95 let mut done = false;
96 let mut first = true;
97 while !done {
98 let mut buffer = [0u8; 65535];
99 match input_stream.read(&mut buffer) {
100 Ok(bytes_read) => {
101 if first{
102 first = false;
103 continue;
104 }
105 web_socket.parse(&buffer[0..bytes_read], | result | {
106 match result {
107 Ok(ServerWebSocketMessage::Ping(_)) => {
108 if write_bytes_to_tcp_stream_no_error(&mut input_stream, &SERVER_WEB_SOCKET_PONG_MESSAGE){
109 done = true;
110 let _ = rx_sender.send(WebSocketMessage::Error("Pong message send failed".into()));
111 }
112 },
113 Ok(ServerWebSocketMessage::Pong(_)) => {
114 },
115 Ok(ServerWebSocketMessage::Text(text)) => {
116 if rx_sender.send(WebSocketMessage::String(text.into())).is_err(){
117 done = true;
118 };
119 },
120 Ok(ServerWebSocketMessage::Binary(data)) => {
121 if rx_sender.send(WebSocketMessage::Binary(data.into())).is_err(){
122 done = true;
123 };
124 },
125 Ok(ServerWebSocketMessage::Close) => {
126 let _ = rx_sender.send(WebSocketMessage::Closed);
127 done = true;
128 },
129 Err(e) => {
130 eprintln!("Websocket error {:?}", e);
131 }
132 }
133 });
134 }
135 Err(e) => {
136 eprintln!("Failed to receive data: {}", e);
137 let _ = rx_sender.send(WebSocketMessage::Closed);
138 done = true;
139 }
140 }
141 }
142 });
143
144 OsWebSocket{sender:Some(sender), stream:Some(stream)}
145 }
146}