1use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
4use std::io::prelude::*;
5use std::sync::{mpsc, mpsc::{RecvTimeoutError}};
6use std::time::Duration;
7pub use crate::websocket::{SERVER_WEB_SOCKET_PONG_MESSAGE, ServerWebSocket, ServerWebSocketMessage, ServerWebSocketMessageFormat, ServerWebSocketMessageHeader, SERVER_WEB_SOCKET_PING_MESSAGE};
8use crate::utils::*;
9
10#[derive(Clone)]
11pub struct HttpServer {
12 pub listen_address: SocketAddr,
13 pub request: mpsc::Sender<HttpServerRequest>,
14 pub post_max_size: u64
15}
16
17pub struct HttpServerResponse {
18 pub header: String,
19 pub body: Vec<u8>
20}
21
22pub enum HttpServerRequest {
23 ConnectWebSocket {
24 web_socket_id: u64,
25 headers:HttpServerHeaders,
26 response_sender: mpsc::Sender<Vec<u8 >>,
27 },
28 DisconnectWebSocket {
29 web_socket_id: u64,
30 },
31 BinaryMessage {
32 web_socket_id: u64,
33 response_sender: mpsc::Sender<Vec<u8 >>,
34 data: Vec<u8>
35 },
36 Get {
37 headers: HttpServerHeaders,
38 response_sender: mpsc::Sender<HttpServerResponse>,
39 },
40 Post {
41 headers: HttpServerHeaders,
42 body: Vec<u8>,
43 response: mpsc::Sender<HttpServerResponse>,
44 }
45}
46
47pub fn start_http_server(
48 http_server: HttpServer,
49) -> Option<std::thread::JoinHandle<() >> {
50
51 let listener = if let Ok(listener) = TcpListener::bind(http_server.listen_address) {listener} else {println!("Cannot bind http server port"); return None};
52
53 let listen_thread = {
54 std::thread::spawn(move || {
55 let mut connection_counter = 0u64;
56 for tcp_stream in listener.incoming() {
57 let mut tcp_stream = if let Ok(tcp_stream) = tcp_stream {
58 tcp_stream
59 }
60 else {
61 println!("Incoming stream failure");
62 continue
63 };
64 let http_server = http_server.clone();
65 connection_counter += 1;
66 let _read_thread = std::thread::spawn(move || {
67
68 let headers = HttpServerHeaders::from_tcp_stream(&mut tcp_stream);
69 if headers.is_none() {
70 return http_error_out(tcp_stream, 500);
71 }
72 let headers = headers.unwrap();
73
74 if headers.sec_websocket_key.is_some() {
75 return handle_web_socket(http_server, tcp_stream, headers, connection_counter);
76 }
77 if headers.verb == "POST" {
78 return handle_post(http_server, tcp_stream, headers);
79 }
80 if headers.verb == "GET" {
81 return handle_get(http_server, tcp_stream, headers);
82 }
83 http_error_out(tcp_stream, 500)
84 });
85 }
86 })
87 };
88 Some(listen_thread)
89}
90
91fn handle_post(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
92 if headers.content_length.is_none() {
94 return http_error_out(tcp_stream, 500);
95 }
96 let content_length = headers.content_length.unwrap();
97 if content_length > http_server.post_max_size {
98 return http_error_out(tcp_stream, 500);
99 }
100 let bytes_total = content_length as usize;
101 let mut body = Vec::new();
102 body.resize(bytes_total, 0u8);
103
104 let mut bytes_left = bytes_total;
105 while bytes_left > 0 {
106 let buf = &mut body[(bytes_total - bytes_left)..bytes_total];
107 let bytes_read = tcp_stream.read(buf);
108 if bytes_read.is_err() {
109 return http_error_out(tcp_stream, 500);
110 }
111 let bytes_read = bytes_read.unwrap();
112 if bytes_read == 0 {
113 return http_error_out(tcp_stream, 500);
114 }
115 bytes_left -= bytes_read;
116 }
117
118 let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
119 if http_server.request.send(HttpServerRequest::Post {
120 headers,
121 body,
122 response: tx_socket
123 }).is_err() {
124 return http_error_out(tcp_stream, 500);
125 };
126
127 if let Ok(response) = rx_socket.recv() {
128 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
129 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
130 }
131 let _ = tcp_stream.shutdown(Shutdown::Both);
132}
133
134fn handle_web_socket(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders, web_socket_id: u64) {
135 let upgrade_response = ServerWebSocket::create_upgrade_response(headers.sec_websocket_key.as_ref().unwrap());
136
137 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, upgrade_response.as_bytes());
138
139 let mut write_tcp_stream = tcp_stream.try_clone().unwrap();
140 let (tx_socket, rx_socket) = mpsc::channel::<Vec<u8 >> ();
141
142 let _write_thread = std::thread::spawn(move || {
143 loop{
145 match rx_socket.recv_timeout(Duration::from_millis(2000)){
146 Ok(data)=>{
147 if data.is_empty(){
148 break
149 }
150 let header = ServerWebSocketMessageHeader::from_len(data.len(), ServerWebSocketMessageFormat::Binary, false);
151 write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, header.as_slice());
152 write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &data);
153 },
154 Err(RecvTimeoutError::Timeout)=>{
155 write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &SERVER_WEB_SOCKET_PING_MESSAGE);
156 }
157 Err(RecvTimeoutError::Disconnected)=>{
158 break
159 }
160 }
161 }
162 let _ = write_tcp_stream.shutdown(Shutdown::Both);
163 });
164
165 if http_server.request.send(HttpServerRequest::ConnectWebSocket {
166 headers,
167 web_socket_id,
168 response_sender: tx_socket.clone()
169 }).is_err() {
170 let _ = tcp_stream.shutdown(Shutdown::Both);
171 return
172 };
173
174 let mut web_socket = ServerWebSocket::new();
175 loop {
176 let mut data = [0u8; 65535];
177 match tcp_stream.read(&mut data) {
178 Ok(n) => {
179 if n == 0 {
180 let _ = tcp_stream.shutdown(Shutdown::Both);
181 let _ = tx_socket.send(Vec::new());
182 break
183 }
184 web_socket.parse(&data[0..n], | result | {
185 match result {
186 Ok(ServerWebSocketMessage::Ping(_)) => {
187 let _ = tx_socket.send(SERVER_WEB_SOCKET_PONG_MESSAGE.to_vec());
188 },
189 Ok(ServerWebSocketMessage::Pong(_)) => {
190 },
191 Ok(ServerWebSocketMessage::Text(_text)) => {
192 }
193 Ok(ServerWebSocketMessage::Binary(data)) => {
194 if http_server.request.send(HttpServerRequest::BinaryMessage {
195 web_socket_id,
196 response_sender: tx_socket.clone(),
197 data: data.to_vec(),
198 }).is_err() {
199 eprintln!("Websocket message deserialize error");
200 let _ = tcp_stream.shutdown(Shutdown::Both);
201 let _ = tx_socket.send(Vec::new());
202 };
203 },
204 Ok(ServerWebSocketMessage::Close) => {
205 let _ = tcp_stream.shutdown(Shutdown::Both);
206 }
207 Err(e) => {
208 eprintln!("Websocket error {:?}", e);
209 let _ = tcp_stream.shutdown(Shutdown::Both);
210 let _ = tx_socket.send(Vec::new());
211 }
212 }
213 });
214 }
215 Err(_) => {
216 println!("Websocket closed");
217 let _ = tcp_stream.shutdown(Shutdown::Both);
218 let _ = tx_socket.send(Vec::new());
219 break;
220 }
221 }
222 }
223
224 let _ = http_server.request.send(HttpServerRequest::DisconnectWebSocket {
225 web_socket_id,
226 });
227}
228
229fn handle_get(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
230 let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
232 if http_server.request.send(HttpServerRequest::Get {
233 headers,
234 response_sender: tx_socket
235 }).is_err() {
236 return http_error_out(tcp_stream, 500);
237 };
238
239 if let Ok(response) = rx_socket.recv() {
240 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
241 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
242 }
243 let _ = tcp_stream.shutdown(Shutdown::Both);
244}