new-home-proxy 0.1.2

This is a part of the New Home IoT System. It is used to make the core available in the www.
use std::collections::HashMap;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Mutex, RwLock};

use actix::clock::Duration;
use actix::Addr;

use crate::communication::{
    ProxyRequest, ProxyResponse, WebsocketCommand, WebsocketSignal, WrappedMessage,
};
use crate::proxy_error::{ProxyError, ProxyResult};
use crate::server::websocket_handler::WebsocketHandler;

pub struct ClientManager {
    clients: RwLock<HashMap<String, Addr<WebsocketHandler>>>,
    requests: RwLock<HashMap<usize, Mutex<Sender<ProxyResponse>>>>,
}

impl ClientManager {
    pub fn new() -> Self {
        Self {
            clients: Default::default(),
            requests: Default::default(),
        }
    }

    /// Sends a request to a client and returns its response
    ///
    /// # Errors
    ///
    /// - when the client is not registered
    /// - when the response is not coming fast enough (within 1 minute)
    ///
    pub fn send_request(
        &self,
        client_id: &String,
        request: ProxyRequest,
    ) -> ProxyResult<ProxyResponse> {
        if !self.clients.read().unwrap().contains_key(client_id) {
            return Err(ProxyError::message(format!(
                "Client {} is not registered",
                client_id
            )));
        }

        let client = self.clients.read().unwrap().get(client_id).unwrap().clone();
        let (send, recv) = channel::<ProxyResponse>();
        let message = WrappedMessage::new(request);

        self.requests
            .write()
            .unwrap()
            .insert(message.id.clone(), Mutex::new(send));

        client.do_send(message);

        let response = match recv.recv_timeout(Duration::from_secs(60)) {
            Ok(response) => response,
            Err(_) => ProxyResponse {
                response: json!({
                    "success": false,
                    "message": [{
                        "error": "Client request timed out"
                    }]
                }),
                content_type: String::from("application/json"),
            },
        };

        Ok(response)
    }

    /// Checks if a given client id is available or already taken
    ///
    /// # Example
    ///
    /// ```
    /// use new_home_proxy::server::client_manager::ClientManager;
    ///
    /// let manager = ClientManager::new();
    /// assert_eq!(manager.is_client_registered(&String::from("bob")), false);
    /// ```
    ///
    pub fn is_client_registered(&self, client_id: &String) -> bool {
        self.clients.read().unwrap().contains_key(client_id)
    }

    /// Registers a client and its address
    ///
    /// # Example
    ///
    /// ```
    /// use new_home_proxy::server::client_manager::ClientManager;
    /// use new_home_proxy::server::websocket_handler::WebsocketHandler;
    /// use actix::Addr;
    /// use actix::dev::channel::channel;
    ///
    /// let manager = ClientManager::new();
    ///
    /// let (sender, _) = channel::<WebsocketHandler>(1);
    /// assert_eq!(
    ///     manager.register_client(String::from("Some client"), Addr::new(sender)).is_ok(),
    ///     true
    /// );
    ///
    /// let (sender, _) = channel::<WebsocketHandler>(1);
    /// assert_eq!(
    ///     manager.register_client(String::from("Some client"), Addr::new(sender)).is_ok(),
    ///     false
    /// );
    /// ```
    ///
    /// # Errors
    ///
    /// - when the client_id is already taken
    ///
    pub fn register_client(
        &self,
        client_id: String,
        handler: Addr<WebsocketHandler>,
    ) -> ProxyResult<()> {
        if self.clients.read().unwrap().contains_key(&client_id) {
            return Err(ProxyError::message(format!(
                "Client {} is already registered",
                &client_id
            )));
        }

        self.clients.write().unwrap().insert(client_id, handler);

        Ok(())
    }

    /// Unregisters a client and its address
    ///
    /// # Errors
    ///
    /// - when the client id is not registered
    ///
    pub fn unregister_client(&self, client_id: &String) -> ProxyResult<()> {
        if !self.clients.read().unwrap().contains_key(client_id) {
            return Err(ProxyError::message(format!(
                "The client {} is not registered",
                client_id
            )));
        }

        self.clients.write().unwrap().remove(client_id);

        Ok(())
    }

    /// Sends a ping `WebsocketCommand` to all registered clients
    pub fn ping_all_clients(&self) {
        for (_, address) in self.clients.read().unwrap().iter() {
            address.do_send(WebsocketCommand(WebsocketSignal::CheckPing))
        }
    }

    /// Sends the response for the given message id to the registered request channel.
    /// Removes the sender if the message was send successfully
    ///
    /// # Errors
    ///
    /// - when there is no sender registered for the given message_id
    /// - when the message could not be send
    ///
    pub(crate) fn response(&self, message_id: usize, response: ProxyResponse) -> ProxyResult<()> {
        {
            let requests = self.requests.read().unwrap();

            let sender = match requests.get(&message_id) {
                Some(sender) => sender,
                _ => {
                    return Err(ProxyError::message(format!(
                        "Sender for message_id {} is not registered",
                        &message_id
                    )));
                }
            };

            sender.lock().unwrap().send(response)?;
        }

        {
            self.requests.write().unwrap().remove(&message_id);
        }

        Ok(())
    }
}