makepad_platform/os/linux/
web_socket.rs

1
2use crate::event::HttpRequest;
3use crate::web_socket::{WebSocketMessage};
4use std::sync::mpsc::{channel, Sender};
5use std::net::{TcpStream, Shutdown};
6use std::io::{Read};
7use makepad_http::utils::write_bytes_to_tcp_stream_no_error;
8use makepad_http::websocket::{ServerWebSocket, ServerWebSocketMessageFormat, ServerWebSocketMessageHeader, ServerWebSocketMessage, SERVER_WEB_SOCKET_PONG_MESSAGE};
9
10pub struct OsWebSocket{
11    sender: Option<Sender<WebSocketMessage>>,
12    stream: Option<TcpStream>,
13}
14
15impl Drop for OsWebSocket{
16    fn drop(&mut self){
17        self.sender.take();
18        if let Some(stream) = self.stream.take(){
19            stream.shutdown(Shutdown::Both).ok();
20        }
21    }
22}
23
24impl OsWebSocket{
25    pub fn send_message(&mut self, message:WebSocketMessage)->Result<(),()>{
26        // lets encode the message into a membuffer and send it to the write thread
27        if let Some(sender) = &mut self.sender{
28            if sender.send(message).is_err(){
29                return Err(());
30            }
31            return Ok(())
32        }
33        Err(())
34    }
35            
36    pub fn close(&mut self){
37    }
38            
39    pub fn open(_socket_id:u64, request: HttpRequest, rx_sender:Sender<WebSocketMessage>)->OsWebSocket{
40        // parse the url
41        let split = request.split_url();
42        // strip off any hashes
43        // alright we have proto, host, port and hash now
44        // lets open a tcpstream
45        let stream = TcpStream::connect(format!("{}:{}", split.host, split.port));
46        // alright lets construct a http request
47        // lets join the headers
48                        
49        let mut http_request = format!("GET /{} HTTP/1.1\r\nHost: {}\r\nConnection: Upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 13\r\nSec-WebSocket-Key: SxJdXBRtW7Q4awLDhflO0Q==\r\n", split.file, split.host);
50        http_request.push_str(&request.get_headers_string());
51        http_request.push_str("\r\n"); 
52                        
53        // lets write the http request
54        if stream.is_err(){
55            rx_sender.send(WebSocketMessage::Error("Error connecting websocket tcpstream".into())).unwrap();
56            return OsWebSocket{sender:None, stream:None}
57        }
58        let mut stream = stream.unwrap();
59        if write_bytes_to_tcp_stream_no_error(&mut stream, http_request.as_bytes()){
60            rx_sender.send(WebSocketMessage::Error("Error writing request to websocket".into())).unwrap();
61            return OsWebSocket{sender:None, stream:None}
62        }
63                        
64        // lets start the thread
65        let mut input_stream = stream.try_clone().unwrap();
66        let mut output_stream = stream.try_clone().unwrap();
67        let (sender, receiver) = channel();
68                        
69        let _writer_thread = std::thread::spawn(move || {
70            while let Ok(msg) = receiver.recv(){
71                match msg{
72                    WebSocketMessage::Binary(data)=>{
73                        let header = ServerWebSocketMessageHeader::from_len(data.len(), ServerWebSocketMessageFormat::Binary, false);
74                        if write_bytes_to_tcp_stream_no_error(&mut output_stream, header.as_slice()) ||
75                        write_bytes_to_tcp_stream_no_error(&mut output_stream, &data){
76                            break;
77                        }
78                    }
79                    WebSocketMessage::String(data)=>{
80                        let header = ServerWebSocketMessageHeader::from_len(data.len(), ServerWebSocketMessageFormat::Binary, false);
81                        if write_bytes_to_tcp_stream_no_error(&mut output_stream, header.as_slice()) ||
82                        write_bytes_to_tcp_stream_no_error(&mut output_stream, &data.as_bytes()){
83                            break;
84                        }
85                    }
86                    _=>{
87                        crate::error!("WebSocketMessage of this type sending not implemented");
88                    }
89                }
90            }
91        });
92                        
93        let _reader_thread = std::thread::spawn(move || {
94            let mut web_socket = ServerWebSocket::new();
95            let mut done = false;
96            let mut first = true;
97            while !done {
98                let mut buffer = [0u8; 65535];
99                match input_stream.read(&mut buffer) {
100                    Ok(bytes_read) => {
101                        if first{
102                            first = false;
103                            continue;
104                        }
105                        web_socket.parse(&buffer[0..bytes_read], | result | {
106                            match result {
107                                Ok(ServerWebSocketMessage::Ping(_)) => {
108                                    if write_bytes_to_tcp_stream_no_error(&mut input_stream, &SERVER_WEB_SOCKET_PONG_MESSAGE){
109                                        done = true;
110                                        let _ = rx_sender.send(WebSocketMessage::Error("Pong message send failed".into()));
111                                    }
112                                },
113                                Ok(ServerWebSocketMessage::Pong(_)) => {
114                                },
115                                Ok(ServerWebSocketMessage::Text(text)) => {
116                                    if rx_sender.send(WebSocketMessage::String(text.into())).is_err(){
117                                        done = true;
118                                    };
119                                },
120                                Ok(ServerWebSocketMessage::Binary(data)) => {
121                                    if rx_sender.send(WebSocketMessage::Binary(data.into())).is_err(){
122                                        done = true;
123                                    };
124                                },
125                                Ok(ServerWebSocketMessage::Close) => {
126                                    let _ = rx_sender.send(WebSocketMessage::Closed);
127                                    done = true;
128                                },
129                                Err(e) => {
130                                    eprintln!("Websocket error {:?}", e);
131                                }
132                            }
133                        });                            
134                    }
135                    Err(e) => {
136                        eprintln!("Failed to receive data: {}", e);
137                        let _ = rx_sender.send(WebSocketMessage::Closed);
138                        done = true;
139                    }
140                }
141            }
142        });
143                                
144        OsWebSocket{sender:Some(sender), stream:Some(stream)}
145    }
146}