new-home-proxy 0.1.2

This is a part of the New Home IoT System. It is used to make the core available in the www.
use std::str::{from_utf8, FromStr};
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use std::time::{Duration, SystemTime};

use actix::io::{SinkWrite, WriteHandler};
use actix::{Actor, ActorContext, AsyncContext, Context, Handler, StreamHandler, WrapFuture};
use actix_codec::Framed;
use actix_http::http::header::{CONTENT_TYPE, ORIGIN};
use actix_http::http::{HeaderValue, Method};
use actix_http::ws::Codec;
use actix_rt::System;
use actix_web::client::Client;
use actix_web::web::Bytes;
use actix_web_actors::ws::{Frame, Message};
use awc::error::WsProtocolError;
use awc::BoxedSocket;
use futures_util::stream::SplitSink;
use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS};
use serde_json::to_string as json_to_string;
use serde_json::Value;

use crate::auth::user_authenticator::UserAuthenticator;
use crate::communication::{
    ProxyRequest, ProxyResponse, WebsocketCommand, WebsocketSignal, WrappedMessage,
};

fn error_response(message: impl ToString) -> ProxyResponse {
    ProxyResponse {
        response: json!({
            "success": false,
            "message": [{
                "error": message.to_string()
            }]
        }),
        content_type: String::from("application/json"),
    }
}

pub struct WebsocketHandler {
    last_ping: SystemTime,
    core_host: String,
    writer: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
    user_authenticator: Arc<UserAuthenticator>,
}

impl WebsocketHandler {
    pub fn new(
        core_host: String,
        writer: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
        user_authenticator: Arc<UserAuthenticator>,
    ) -> Self {
        Self {
            last_ping: SystemTime::now(),
            core_host,
            writer,
            user_authenticator,
        }
    }

    async fn handle_request(
        core_host: String,
        request: ProxyRequest,
        user_authenticator: Arc<UserAuthenticator>,
    ) -> ProxyResponse {
        if !user_authenticator
            .validate_credentials(request.username.clone(), request.password.clone())
        {
            return error_response("Username and/or Password is invalid");
        }

        const FRAGMENT: &AsciiSet = &CONTROLS.add(b' ').add(b'"').add(b'<').add(b'>').add(b'`');
        let url = utf8_percent_encode(
            format!("{}/{}", &core_host, &request.url).as_str(),
            FRAGMENT,
        )
        .to_string();

        let client = Client::default();
        let mut client_request = client.request(
            Method::from_str(request.method.as_str()).unwrap_or(Method::GET),
            url,
        );

        client_request = client_request.header(ORIGIN, request.origin.clone());

        println!("{:?}", &request.request_body);

        let mut core_response = match client_request.send_json(&request.request_body).await {
            Ok(response) => response,
            _ => return error_response("Could not reach core"),
        };

        match core_response.body().await {
            Ok(response) => {
                let response = String::from(from_utf8(response.as_ref()).unwrap_or(""));

                let content_type = core_response
                    .headers()
                    .get(CONTENT_TYPE)
                    .unwrap_or(&HeaderValue::from_str("text/plain").unwrap())
                    .to_str()
                    .unwrap_or("text/plain")
                    .to_string();

                if content_type == "application/json" {
                    match serde_json::from_str::<Value>(&response) {
                        Ok(response) => {
                            return ProxyResponse {
                                response,
                                content_type: String::from("application/json"),
                            }
                        }
                        _ => {}
                    };
                }

                ProxyResponse {
                    response: Value::String(response),
                    content_type,
                }
            }
            _ => error_response("Could not receive core body"),
        }
    }
}

impl Actor for WebsocketHandler {
    type Context = Context<Self>;
}

impl StreamHandler<Result<Frame, WsProtocolError>> for WebsocketHandler {
    fn handle(&mut self, message: Result<Frame, WsProtocolError>, ctx: &mut Self::Context) {
        match message {
            Ok(Frame::Text(message)) => {
                let message = match from_utf8(message.as_ref()) {
                    Ok(string) => String::from(string),
                    _ => return,
                };

                match serde_json::from_str::<WrappedMessage<ProxyRequest>>(&message) {
                    Ok(request) => {
                        let future = Box::pin(Self::handle_request(
                            self.core_host.clone(),
                            request.clone().into_inner(),
                            self.user_authenticator.clone(),
                        ));
                        let addr = ctx.address();

                        ctx.wait(
                            (async move {
                                let response: ProxyResponse = future.await;
                                let response = request.answer(response);

                                addr.do_send(response);
                            })
                            .into_actor(self),
                        );
                    }
                    _ => {}
                }
            }
            Ok(Frame::Close(_)) => {
                println!("Connection closed by server");

                ctx.stop();
                System::current().stop();
            }
            Ok(Frame::Pong(_)) => {
                self.last_ping = SystemTime::now();
            }
            _ => {}
        }
    }

    fn started(&mut self, ctx: &mut Self::Context) {
        let addr = ctx.address();

        thread::spawn(move || loop {
            if !addr.connected() {
                break;
            }

            addr.do_send(WebsocketCommand(WebsocketSignal::Ping));

            sleep(Duration::from_millis(1000));
        });

        println!("Started client");
    }

    fn finished(&mut self, ctx: &mut Self::Context) {
        ctx.stop();
        System::current().stop();

        println!("Client stopped");
    }
}

impl Handler<WrappedMessage<ProxyResponse>> for WebsocketHandler {
    type Result = ();

    fn handle(
        &mut self,
        response: WrappedMessage<ProxyResponse>,
        _: &mut Self::Context,
    ) -> Self::Result {
        self.writer
            .write(Message::Text(json_to_string(&response).unwrap()))
            .unwrap_or_default();
    }
}

impl Handler<WebsocketCommand> for WebsocketHandler {
    type Result = ();

    fn handle(&mut self, command: WebsocketCommand, _: &mut Self::Context) -> Self::Result {
        match command.0 {
            WebsocketSignal::Close => self.writer.write(Message::Close(None)).unwrap_or_default(),
            WebsocketSignal::CheckPing => {
                let ping_age = SystemTime::now()
                    .duration_since(self.last_ping)
                    .unwrap_or(Duration::from_secs(99))
                    .as_millis();

                if ping_age > 1500 {
                    self.writer.write(Message::Close(None)).unwrap_or_default();
                }
            }
            WebsocketSignal::Ping => {
                self.writer
                    .write(Message::Ping(Bytes::new()))
                    .unwrap_or_default();
            }
        }
    }
}

impl WriteHandler<WsProtocolError> for WebsocketHandler {}