use std::str::{from_utf8, FromStr};
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use std::time::{Duration, SystemTime};
use actix::io::{SinkWrite, WriteHandler};
use actix::{Actor, ActorContext, AsyncContext, Context, Handler, StreamHandler, WrapFuture};
use actix_codec::Framed;
use actix_http::http::header::{CONTENT_TYPE, ORIGIN};
use actix_http::http::{HeaderValue, Method};
use actix_http::ws::Codec;
use actix_rt::System;
use actix_web::client::Client;
use actix_web::web::Bytes;
use actix_web_actors::ws::{Frame, Message};
use awc::error::WsProtocolError;
use awc::BoxedSocket;
use futures_util::stream::SplitSink;
use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS};
use serde_json::to_string as json_to_string;
use serde_json::Value;
use crate::auth::user_authenticator::UserAuthenticator;
use crate::communication::{
ProxyRequest, ProxyResponse, WebsocketCommand, WebsocketSignal, WrappedMessage,
};
fn error_response(message: impl ToString) -> ProxyResponse {
ProxyResponse {
response: json!({
"success": false,
"message": [{
"error": message.to_string()
}]
}),
content_type: String::from("application/json"),
}
}
pub struct WebsocketHandler {
last_ping: SystemTime,
core_host: String,
writer: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
user_authenticator: Arc<UserAuthenticator>,
}
impl WebsocketHandler {
pub fn new(
core_host: String,
writer: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
user_authenticator: Arc<UserAuthenticator>,
) -> Self {
Self {
last_ping: SystemTime::now(),
core_host,
writer,
user_authenticator,
}
}
async fn handle_request(
core_host: String,
request: ProxyRequest,
user_authenticator: Arc<UserAuthenticator>,
) -> ProxyResponse {
if !user_authenticator
.validate_credentials(request.username.clone(), request.password.clone())
{
return error_response("Username and/or Password is invalid");
}
const FRAGMENT: &AsciiSet = &CONTROLS.add(b' ').add(b'"').add(b'<').add(b'>').add(b'`');
let url = utf8_percent_encode(
format!("{}/{}", &core_host, &request.url).as_str(),
FRAGMENT,
)
.to_string();
let client = Client::default();
let mut client_request = client.request(
Method::from_str(request.method.as_str()).unwrap_or(Method::GET),
url,
);
client_request = client_request.header(ORIGIN, request.origin.clone());
println!("{:?}", &request.request_body);
let mut core_response = match client_request.send_json(&request.request_body).await {
Ok(response) => response,
_ => return error_response("Could not reach core"),
};
match core_response.body().await {
Ok(response) => {
let response = String::from(from_utf8(response.as_ref()).unwrap_or(""));
let content_type = core_response
.headers()
.get(CONTENT_TYPE)
.unwrap_or(&HeaderValue::from_str("text/plain").unwrap())
.to_str()
.unwrap_or("text/plain")
.to_string();
if content_type == "application/json" {
match serde_json::from_str::<Value>(&response) {
Ok(response) => {
return ProxyResponse {
response,
content_type: String::from("application/json"),
}
}
_ => {}
};
}
ProxyResponse {
response: Value::String(response),
content_type,
}
}
_ => error_response("Could not receive core body"),
}
}
}
impl Actor for WebsocketHandler {
type Context = Context<Self>;
}
impl StreamHandler<Result<Frame, WsProtocolError>> for WebsocketHandler {
fn handle(&mut self, message: Result<Frame, WsProtocolError>, ctx: &mut Self::Context) {
match message {
Ok(Frame::Text(message)) => {
let message = match from_utf8(message.as_ref()) {
Ok(string) => String::from(string),
_ => return,
};
match serde_json::from_str::<WrappedMessage<ProxyRequest>>(&message) {
Ok(request) => {
let future = Box::pin(Self::handle_request(
self.core_host.clone(),
request.clone().into_inner(),
self.user_authenticator.clone(),
));
let addr = ctx.address();
ctx.wait(
(async move {
let response: ProxyResponse = future.await;
let response = request.answer(response);
addr.do_send(response);
})
.into_actor(self),
);
}
_ => {}
}
}
Ok(Frame::Close(_)) => {
println!("Connection closed by server");
ctx.stop();
System::current().stop();
}
Ok(Frame::Pong(_)) => {
self.last_ping = SystemTime::now();
}
_ => {}
}
}
fn started(&mut self, ctx: &mut Self::Context) {
let addr = ctx.address();
thread::spawn(move || loop {
if !addr.connected() {
break;
}
addr.do_send(WebsocketCommand(WebsocketSignal::Ping));
sleep(Duration::from_millis(1000));
});
println!("Started client");
}
fn finished(&mut self, ctx: &mut Self::Context) {
ctx.stop();
System::current().stop();
println!("Client stopped");
}
}
impl Handler<WrappedMessage<ProxyResponse>> for WebsocketHandler {
type Result = ();
fn handle(
&mut self,
response: WrappedMessage<ProxyResponse>,
_: &mut Self::Context,
) -> Self::Result {
self.writer
.write(Message::Text(json_to_string(&response).unwrap()))
.unwrap_or_default();
}
}
impl Handler<WebsocketCommand> for WebsocketHandler {
type Result = ();
fn handle(&mut self, command: WebsocketCommand, _: &mut Self::Context) -> Self::Result {
match command.0 {
WebsocketSignal::Close => self.writer.write(Message::Close(None)).unwrap_or_default(),
WebsocketSignal::CheckPing => {
let ping_age = SystemTime::now()
.duration_since(self.last_ping)
.unwrap_or(Duration::from_secs(99))
.as_millis();
if ping_age > 1500 {
self.writer.write(Message::Close(None)).unwrap_or_default();
}
}
WebsocketSignal::Ping => {
self.writer
.write(Message::Ping(Bytes::new()))
.unwrap_or_default();
}
}
}
}
impl WriteHandler<WsProtocolError> for WebsocketHandler {}