liquid-ml 0.1.0

A university project to build a distributed compute system for UDFs
Documentation
//! Represents a server node in a distributed system, with implementations
//! provided for `LiquidML` use cases.
use crate::error::LiquidError;
use crate::network::{message, Connection, ControlMsg, Message, MessageCodec};
use log::info;
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::io::split;
use tokio::net::TcpListener;
use tokio_util::codec::{FramedRead, FramedWrite};

/// Represents a registration `Server` in a distributed system.
#[derive(Debug)]
pub struct Server {
    /// The `address` of this `Server`
    pub(crate) address: SocketAddr,
    /// The id of the current message
    pub(crate) msg_id: usize,
    /// A directory which is a `HashMap` of network names to that network,
    /// (a `HashMap` of `node_id` to a [`Connection`]).
    ///
    /// [`Connection`]: struct.Connection.html
    pub(crate) directory:
        HashMap<String, HashMap<usize, Connection<ControlMsg>>>,
}

impl Server {
    /// Create a new `Server` running on the given `address` in the format of
    /// `IP:Port`.
    pub async fn new(address: &str) -> Result<Self, LiquidError> {
        Ok(Server {
            msg_id: 0,
            directory: HashMap::new(),
            address: address.parse().unwrap(),
        })
    }

    /// A blocking function that allows a `Server` to listen for connections
    /// from newly started [`Client`]s. When a new [`Client`] connects to this
    /// `Server`, we add the connection to our directory for sending
    /// `ControlMsg::Kill` messages, but do not listen for further messages
    /// from the [`Client`] since this is not required for performing simple
    /// registration.
    ///
    /// [`Client`]: struct.Client.html
    pub async fn accept_new_connections(&mut self) -> Result<(), LiquidError> {
        let mut listener = TcpListener::bind(&self.address).await?;
        loop {
            // wait on connections from new clients
            let (socket, _) = listener.accept().await?;
            let (reader, writer) = split(socket);
            let mut stream = FramedRead::new(reader, MessageCodec::new());
            let sink = FramedWrite::new(writer, MessageCodec::new());
            // Receive the listening IP:Port address of the new client
            let address = message::read_msg(&mut stream).await?;
            let (address, network_name) = if let ControlMsg::Introduction {
                address,
                network_name,
            } = address.msg
            {
                (address, network_name)
            } else {
                return Err(LiquidError::UnexpectedMessage);
            };
            let conn = Connection { address, sink };

            let target_id;
            let dir;
            match self.directory.get_mut(&network_name) {
                Some(d) => {
                    // there are some existing clients of this type
                    target_id = d.len() + 1; // node id's start at 1
                    dir = d.iter().map(|(k, v)| (*k, v.address)).collect();
                    d.insert(target_id, conn);
                }
                None => {
                    target_id = 1;
                    dir = Vec::new();
                    let mut d = HashMap::new();
                    d.insert(target_id, conn);
                    self.directory.insert(network_name.clone(), d);
                }
            };

            info!(
                "Connected to address: {:#?} joining network {:#?}, assigning id: {:#?}",
                &address,
                &network_name,
                target_id
            );

            // Send the new client the list of existing nodes.
            let dir_msg = ControlMsg::Directory { dir };
            self.send_msg(target_id, &network_name, dir_msg).await?;
        }
    }

    /// Send the given `message` to a [`Client`] running in the network with
    /// the given `network_name` and with the given `target_id`.
    ///
    /// [`Client`]: struct.Client.html
    pub async fn send_msg(
        &mut self,
        target_id: usize,
        network_name: &str,
        message: ControlMsg,
    ) -> Result<(), LiquidError> {
        let m = Message::new(self.msg_id, 0, target_id, message);
        message::send_msg(
            target_id,
            m,
            self.directory.get_mut(network_name).unwrap(),
        )
        .await?;
        self.msg_id += 1;
        Ok(())
    }

    /// Broadcast the given `message` to all currently connected [`Clients`]
    /// in the network with the given `network_name`
    ///
    /// [`Client`]: struct.Client.html
    pub async fn broadcast(
        &mut self,
        message: ControlMsg,
        network_name: &str,
    ) -> Result<(), LiquidError> {
        let d: Vec<usize> = self
            .directory
            .iter()
            .find(|(k, _)| **k == network_name)
            .unwrap()
            .1
            .iter()
            .map(|(k, _)| *k)
            .collect();
        for k in d {
            self.send_msg(k, network_name, message.clone()).await?;
        }
        Ok(())
    }
}