use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
use std::io::prelude::*;
use std::sync::{mpsc, mpsc::{RecvTimeoutError}};
use std::time::Duration;
pub use crate::websocket::{SERVER_WEB_SOCKET_PONG_MESSAGE, ServerWebSocket, ServerWebSocketMessage, ServerWebSocketMessageFormat, ServerWebSocketMessageHeader, SERVER_WEB_SOCKET_PING_MESSAGE};
use crate::utils::*;
#[derive(Clone)]
pub struct HttpServer {
pub listen_address: SocketAddr,
pub request: mpsc::Sender<HttpServerRequest>,
pub post_max_size: u64
}
pub struct HttpServerResponse {
pub header: String,
pub body: Vec<u8>
}
pub enum HttpServerRequest {
ConnectWebSocket {
web_socket_id: u64,
headers:HttpServerHeaders,
response_sender: mpsc::Sender<Vec<u8 >>,
},
DisconnectWebSocket {
web_socket_id: u64,
},
BinaryMessage {
web_socket_id: u64,
response_sender: mpsc::Sender<Vec<u8 >>,
data: Vec<u8>
},
Get {
headers: HttpServerHeaders,
response_sender: mpsc::Sender<HttpServerResponse>,
},
Post {
headers: HttpServerHeaders,
body: Vec<u8>,
response: mpsc::Sender<HttpServerResponse>,
}
}
pub fn start_http_server(
http_server: HttpServer,
) -> Option<std::thread::JoinHandle<() >> {
let listener = if let Ok(listener) = TcpListener::bind(http_server.listen_address) {listener} else {println!("Cannot bind http server port"); return None};
let listen_thread = {
std::thread::spawn(move || {
let mut connection_counter = 0u64;
for tcp_stream in listener.incoming() {
let mut tcp_stream = if let Ok(tcp_stream) = tcp_stream {
tcp_stream
}
else {
println!("Incoming stream failure");
continue
};
let http_server = http_server.clone();
connection_counter += 1;
let _read_thread = std::thread::spawn(move || {
let headers = HttpServerHeaders::from_tcp_stream(&mut tcp_stream);
if headers.is_none() {
return http_error_out(tcp_stream, 500);
}
let headers = headers.unwrap();
if headers.sec_websocket_key.is_some() {
return handle_web_socket(http_server, tcp_stream, headers, connection_counter);
}
if headers.verb == "POST" {
return handle_post(http_server, tcp_stream, headers);
}
if headers.verb == "GET" {
return handle_get(http_server, tcp_stream, headers);
}
http_error_out(tcp_stream, 500)
});
}
})
};
Some(listen_thread)
}
fn handle_post(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
if headers.content_length.is_none() {
return http_error_out(tcp_stream, 500);
}
let content_length = headers.content_length.unwrap();
if content_length > http_server.post_max_size {
return http_error_out(tcp_stream, 500);
}
let bytes_total = content_length as usize;
let mut body = Vec::new();
body.resize(bytes_total, 0u8);
let mut bytes_left = bytes_total;
while bytes_left > 0 {
let buf = &mut body[(bytes_total - bytes_left)..bytes_total];
let bytes_read = tcp_stream.read(buf);
if bytes_read.is_err() {
return http_error_out(tcp_stream, 500);
}
let bytes_read = bytes_read.unwrap();
if bytes_read == 0 {
return http_error_out(tcp_stream, 500);
}
bytes_left -= bytes_read;
}
let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
if http_server.request.send(HttpServerRequest::Post {
headers,
body,
response: tx_socket
}).is_err() {
return http_error_out(tcp_stream, 500);
};
if let Ok(response) = rx_socket.recv() {
write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
}
let _ = tcp_stream.shutdown(Shutdown::Both);
}
fn handle_web_socket(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders, web_socket_id: u64) {
let upgrade_response = ServerWebSocket::create_upgrade_response(headers.sec_websocket_key.as_ref().unwrap());
write_bytes_to_tcp_stream_no_error(&mut tcp_stream, upgrade_response.as_bytes());
let mut write_tcp_stream = tcp_stream.try_clone().unwrap();
let (tx_socket, rx_socket) = mpsc::channel::<Vec<u8 >> ();
let _write_thread = std::thread::spawn(move || {
loop{
match rx_socket.recv_timeout(Duration::from_millis(2000)){
Ok(data)=>{
if data.is_empty(){
break
}
let header = ServerWebSocketMessageHeader::from_len(data.len(), ServerWebSocketMessageFormat::Binary, false);
write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, header.as_slice());
write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &data);
},
Err(RecvTimeoutError::Timeout)=>{
write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &SERVER_WEB_SOCKET_PING_MESSAGE);
}
Err(RecvTimeoutError::Disconnected)=>{
break
}
}
}
let _ = write_tcp_stream.shutdown(Shutdown::Both);
});
if http_server.request.send(HttpServerRequest::ConnectWebSocket {
headers,
web_socket_id,
response_sender: tx_socket.clone()
}).is_err() {
let _ = tcp_stream.shutdown(Shutdown::Both);
return
};
let mut web_socket = ServerWebSocket::new();
loop {
let mut data = [0u8; 65535];
match tcp_stream.read(&mut data) {
Ok(n) => {
if n == 0 {
let _ = tcp_stream.shutdown(Shutdown::Both);
let _ = tx_socket.send(Vec::new());
break
}
web_socket.parse(&data[0..n], | result | {
match result {
Ok(ServerWebSocketMessage::Ping(_)) => {
let _ = tx_socket.send(SERVER_WEB_SOCKET_PONG_MESSAGE.to_vec());
},
Ok(ServerWebSocketMessage::Pong(_)) => {
},
Ok(ServerWebSocketMessage::Text(_text)) => {
}
Ok(ServerWebSocketMessage::Binary(data)) => {
if http_server.request.send(HttpServerRequest::BinaryMessage {
web_socket_id,
response_sender: tx_socket.clone(),
data: data.to_vec(),
}).is_err() {
eprintln!("Websocket message deserialize error");
let _ = tcp_stream.shutdown(Shutdown::Both);
let _ = tx_socket.send(Vec::new());
};
},
Ok(ServerWebSocketMessage::Close) => {
let _ = tcp_stream.shutdown(Shutdown::Both);
}
Err(e) => {
eprintln!("Websocket error {:?}", e);
let _ = tcp_stream.shutdown(Shutdown::Both);
let _ = tx_socket.send(Vec::new());
}
}
});
}
Err(_) => {
println!("Websocket closed");
let _ = tcp_stream.shutdown(Shutdown::Both);
let _ = tx_socket.send(Vec::new());
break;
}
}
}
let _ = http_server.request.send(HttpServerRequest::DisconnectWebSocket {
web_socket_id,
});
}
fn handle_get(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
if http_server.request.send(HttpServerRequest::Get {
headers,
response_sender: tx_socket
}).is_err() {
return http_error_out(tcp_stream, 500);
};
if let Ok(response) = rx_socket.recv() {
write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
}
let _ = tcp_stream.shutdown(Shutdown::Both);
}