liquid_ml/network/
server.rs

1//! Represents a server node in a distributed system, with implementations
2//! provided for `LiquidML` use cases.
3use crate::error::LiquidError;
4use crate::network::{message, Connection, ControlMsg, Message, MessageCodec};
5use log::info;
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use tokio::io::split;
9use tokio::net::TcpListener;
10use tokio_util::codec::{FramedRead, FramedWrite};
11
12/// Represents a registration `Server` in a distributed system.
13#[derive(Debug)]
14pub struct Server {
15    /// The `address` of this `Server`
16    pub(crate) address: SocketAddr,
17    /// The id of the current message
18    pub(crate) msg_id: usize,
19    /// A directory which is a `HashMap` of network names to that network,
20    /// (a `HashMap` of `node_id` to a [`Connection`]).
21    ///
22    /// [`Connection`]: struct.Connection.html
23    pub(crate) directory:
24        HashMap<String, HashMap<usize, Connection<ControlMsg>>>,
25}
26
27impl Server {
28    /// Create a new `Server` running on the given `address` in the format of
29    /// `IP:Port`.
30    pub async fn new(address: &str) -> Result<Self, LiquidError> {
31        Ok(Server {
32            msg_id: 0,
33            directory: HashMap::new(),
34            address: address.parse().unwrap(),
35        })
36    }
37
38    /// A blocking function that allows a `Server` to listen for connections
39    /// from newly started [`Client`]s. When a new [`Client`] connects to this
40    /// `Server`, we add the connection to our directory for sending
41    /// `ControlMsg::Kill` messages, but do not listen for further messages
42    /// from the [`Client`] since this is not required for performing simple
43    /// registration.
44    ///
45    /// [`Client`]: struct.Client.html
46    pub async fn accept_new_connections(&mut self) -> Result<(), LiquidError> {
47        let mut listener = TcpListener::bind(&self.address).await?;
48        loop {
49            // wait on connections from new clients
50            let (socket, _) = listener.accept().await?;
51            let (reader, writer) = split(socket);
52            let mut stream = FramedRead::new(reader, MessageCodec::new());
53            let sink = FramedWrite::new(writer, MessageCodec::new());
54            // Receive the listening IP:Port address of the new client
55            let address = message::read_msg(&mut stream).await?;
56            let (address, network_name) = if let ControlMsg::Introduction {
57                address,
58                network_name,
59            } = address.msg
60            {
61                (address, network_name)
62            } else {
63                return Err(LiquidError::UnexpectedMessage);
64            };
65            let conn = Connection { address, sink };
66
67            let target_id;
68            let dir;
69            match self.directory.get_mut(&network_name) {
70                Some(d) => {
71                    // there are some existing clients of this type
72                    target_id = d.len() + 1; // node id's start at 1
73                    dir = d.iter().map(|(k, v)| (*k, v.address)).collect();
74                    d.insert(target_id, conn);
75                }
76                None => {
77                    target_id = 1;
78                    dir = Vec::new();
79                    let mut d = HashMap::new();
80                    d.insert(target_id, conn);
81                    self.directory.insert(network_name.clone(), d);
82                }
83            };
84
85            info!(
86                "Connected to address: {:#?} joining network {:#?}, assigning id: {:#?}",
87                &address,
88                &network_name,
89                target_id
90            );
91
92            // Send the new client the list of existing nodes.
93            let dir_msg = ControlMsg::Directory { dir };
94            self.send_msg(target_id, &network_name, dir_msg).await?;
95        }
96    }
97
98    /// Send the given `message` to a [`Client`] running in the network with
99    /// the given `network_name` and with the given `target_id`.
100    ///
101    /// [`Client`]: struct.Client.html
102    pub async fn send_msg(
103        &mut self,
104        target_id: usize,
105        network_name: &str,
106        message: ControlMsg,
107    ) -> Result<(), LiquidError> {
108        let m = Message::new(self.msg_id, 0, target_id, message);
109        message::send_msg(
110            target_id,
111            m,
112            self.directory.get_mut(network_name).unwrap(),
113        )
114        .await?;
115        self.msg_id += 1;
116        Ok(())
117    }
118
119    /// Broadcast the given `message` to all currently connected [`Clients`]
120    /// in the network with the given `network_name`
121    ///
122    /// [`Client`]: struct.Client.html
123    pub async fn broadcast(
124        &mut self,
125        message: ControlMsg,
126        network_name: &str,
127    ) -> Result<(), LiquidError> {
128        let d: Vec<usize> = self
129            .directory
130            .iter()
131            .find(|(k, _)| **k == network_name)
132            .unwrap()
133            .1
134            .iter()
135            .map(|(k, _)| *k)
136            .collect();
137        for k in d {
138            self.send_msg(k, network_name, message.clone()).await?;
139        }
140        Ok(())
141    }
142}