1use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
4use std::io::prelude::*;
5use std::sync::{mpsc, mpsc::{RecvTimeoutError}};
6use std::time::Duration;
7
8use crate::websocket::{WebSocket, WebSocketMessage, BinaryMessageHeader, PING_MESSAGE};
9use crate::utils::*;
10
11#[derive(Clone)]
12pub struct HttpServer {
13 pub listen_address: SocketAddr,
14 pub request: mpsc::Sender<HttpServerRequest>,
15 pub post_max_size: u64
16}
17
18pub struct HttpServerResponse {
19 pub header: String,
20 pub body: Vec<u8>
21}
22
23pub enum HttpServerRequest {
24 ConnectWebSocket {
25 web_socket_id: u64,
26 headers:HttpServerHeaders,
27 response_sender: mpsc::Sender<Vec<u8 >>,
28 },
29 DisconnectWebSocket {
30 web_socket_id: u64,
31 },
32 BinaryMessage {
33 web_socket_id: u64,
34 response_sender: mpsc::Sender<Vec<u8 >>,
35 data: Vec<u8>
36 },
37 Get {
38 headers: HttpServerHeaders,
39 response_sender: mpsc::Sender<HttpServerResponse>,
40 },
41 Post {
42 headers: HttpServerHeaders,
43 body: Vec<u8>,
44 response: mpsc::Sender<HttpServerResponse>,
45 }
46}
47
48pub fn start_http_server(
49 http_server: HttpServer,
50) -> Option<std::thread::JoinHandle<() >> {
51
52 let listener = if let Ok(listener) = TcpListener::bind(http_server.listen_address) {listener} else {println!("Cannot bind http server port"); return None};
53
54 let listen_thread = {
55 std::thread::spawn(move || {
56 let mut connection_counter = 0u64;
57 for tcp_stream in listener.incoming() {
58 let mut tcp_stream = if let Ok(tcp_stream) = tcp_stream {
59 tcp_stream
60 }
61 else {
62 println!("Incoming stream failure");
63 continue
64 };
65 let http_server = http_server.clone();
66 connection_counter += 1;
67 let _read_thread = std::thread::spawn(move || {
68
69 let headers = HttpServerHeaders::from_tcp_stream(&mut tcp_stream);
70 if headers.is_none() {
71 return http_error_out(tcp_stream, 500);
72 }
73 let headers = headers.unwrap();
74
75 if headers.sec_websocket_key.is_some() {
76 return handle_web_socket(http_server, tcp_stream, headers, connection_counter);
77 }
78 if headers.verb == "POST" {
79 return handle_post(http_server, tcp_stream, headers);
80 }
81 if headers.verb == "GET" {
82 return handle_get(http_server, tcp_stream, headers);
83 }
84 http_error_out(tcp_stream, 500)
85 });
86 }
87 })
88 };
89 Some(listen_thread)
90}
91
92fn handle_post(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
93 if headers.content_length.is_none() {
95 return http_error_out(tcp_stream, 500);
96 }
97 let content_length = headers.content_length.unwrap();
98 if content_length > http_server.post_max_size {
99 return http_error_out(tcp_stream, 500);
100 }
101 let bytes_total = content_length as usize;
102 let mut body = Vec::new();
103 body.resize(bytes_total, 0u8);
104
105 let mut bytes_left = bytes_total;
106 while bytes_left > 0 {
107 let buf = &mut body[(bytes_total - bytes_left)..bytes_total];
108 let bytes_read = tcp_stream.read(buf);
109 if bytes_read.is_err() {
110 return http_error_out(tcp_stream, 500);
111 }
112 let bytes_read = bytes_read.unwrap();
113 if bytes_read == 0 {
114 return http_error_out(tcp_stream, 500);
115 }
116 bytes_left -= bytes_read;
117 }
118
119 let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
120 if http_server.request.send(HttpServerRequest::Post {
121 headers,
122 body,
123 response: tx_socket
124 }).is_err() {
125 return http_error_out(tcp_stream, 500);
126 };
127
128 if let Ok(response) = rx_socket.recv() {
129 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
130 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
131 }
132 let _ = tcp_stream.shutdown(Shutdown::Both);
133}
134
135fn handle_web_socket(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders, web_socket_id: u64) {
136 let upgrade_response = WebSocket::create_upgrade_response(headers.sec_websocket_key.as_ref().unwrap());
137
138 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, upgrade_response.as_bytes());
139
140 let mut write_tcp_stream = tcp_stream.try_clone().unwrap();
141 let (tx_socket, rx_socket) = mpsc::channel::<Vec<u8 >> ();
142
143 let _write_thread = std::thread::spawn(move || {
144 loop{
146 match rx_socket.recv_timeout(Duration::from_millis(2000)){
147 Ok(data)=>{
148 if data.is_empty(){
149 println!("Write socket closed");
150 break
151 }
152 let header = BinaryMessageHeader::from_len(data.len());
153 write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, header.as_slice());
154 write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &data);
155 },
156 Err(RecvTimeoutError::Timeout)=>{
157 write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &PING_MESSAGE);
158 }
159 Err(RecvTimeoutError::Disconnected)=>{
160 println!("Write socket closed");
161 break
162 }
163 }
164 }
165 let _ = write_tcp_stream.shutdown(Shutdown::Both);
166 });
167
168 if http_server.request.send(HttpServerRequest::ConnectWebSocket {
169 headers,
170 web_socket_id,
171 response_sender: tx_socket.clone()
172 }).is_err() {
173 let _ = tcp_stream.shutdown(Shutdown::Both);
174 return
175 };
176
177 let mut web_socket = WebSocket::new();
178 loop {
179 let mut data = [0u8; 65535];
180 match tcp_stream.read(&mut data) {
181 Ok(n) => {
182 if n == 0 {
183 println!("Websocket closed");
184 let _ = tcp_stream.shutdown(Shutdown::Both);
185 let _ = tx_socket.send(Vec::new());
186 break
187 }
188 web_socket.parse(&data[0..n], | result | {
189 match result {
190 Ok(WebSocketMessage::Ping(_)) => {
191 },
192 Ok(WebSocketMessage::Pong(_)) => {
193 },
194 Ok(WebSocketMessage::Text(_text)) => {
195 println!("Websocket text");
196 }
197 Ok(WebSocketMessage::Binary(data)) => {
198 if http_server.request.send(HttpServerRequest::BinaryMessage {
199 web_socket_id,
200 response_sender: tx_socket.clone(),
201 data: data.to_vec(),
202 }).is_err() {
203 eprintln!("Websocket message deserialize error");
204 let _ = tcp_stream.shutdown(Shutdown::Both);
205 let _ = tx_socket.send(Vec::new());
206 };
207 },
208 Ok(WebSocketMessage::Close) => {
209 let _ = tcp_stream.shutdown(Shutdown::Both);
210 }
211 Err(e) => {
212 eprintln!("Websocket error {:?}", e);
213 let _ = tcp_stream.shutdown(Shutdown::Both);
214 let _ = tx_socket.send(Vec::new());
215 }
216 }
217 });
218 }
219 Err(_) => {
220 println!("Websocket closed");
221 let _ = tcp_stream.shutdown(Shutdown::Both);
222 let _ = tx_socket.send(Vec::new());
223 break;
224 }
225 }
226 }
227
228 let _ = http_server.request.send(HttpServerRequest::DisconnectWebSocket {
229 web_socket_id,
230 });
231}
232
233fn handle_get(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
234 let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
236 if http_server.request.send(HttpServerRequest::Get {
237 headers,
238 response_sender: tx_socket
239 }).is_err() {
240 return http_error_out(tcp_stream, 500);
241 };
242
243 if let Ok(response) = rx_socket.recv() {
244 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
245 write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
246 }
247 let _ = tcp_stream.shutdown(Shutdown::Both);
248}