sustenet_client/
lib.rs

1use sustenet_shared as shared;
2
3use std::net::{ IpAddr, Ipv4Addr };
4use std::str::FromStr;
5use std::sync::{ Arc, LazyLock };
6
7use tokio::io::{ AsyncReadExt, AsyncWriteExt, BufReader };
8use tokio::net::TcpStream;
9use tokio::sync::mpsc::Sender;
10use tokio::sync::{ RwLock, mpsc };
11
12use sustenet_shared::ClientPlugin;
13use shared::logging::{ LogType, Logger };
14use shared::packets::cluster::ToClient;
15use shared::packets::master::ToUnknown;
16use shared::utils::constants::{ DEFAULT_IP, MASTER_PORT };
17use shared::{ lread_string, lselect };
18
19lazy_static::lazy_static! {
20    pub static ref CLUSTER_SERVERS: Arc<RwLock<Vec<ClusterInfo>>> = Arc::new(
21        RwLock::new(Vec::new())
22    );
23    pub static ref CONNECTION: Arc<RwLock<Option<Connection>>> = Arc::new(
24        RwLock::new(
25            Some(Connection {
26                ip: get_ip(DEFAULT_IP),
27                port: MASTER_PORT,
28                connection_type: ConnectionType::MasterServer,
29            })
30        )
31    );
32}
33pub static LOGGER: LazyLock<Logger> = LazyLock::new(|| Logger::new(LogType::Cluster));
34
35#[derive(Debug, Clone)]
36pub struct ClusterInfo {
37    pub name: String,
38    pub ip: String,
39    pub port: u16,
40    pub max_connections: u32,
41}
42
43#[derive(Clone, Copy)]
44pub struct Connection {
45    pub ip: IpAddr,
46    pub port: u16,
47    pub connection_type: ConnectionType,
48}
49
50impl From<ClusterInfo> for Connection {
51    fn from(info: ClusterInfo) -> Self {
52        Connection {
53            ip: IpAddr::from_str(info.ip.as_str()).expect("Failed to parse the IP."),
54            port: info.port,
55            connection_type: ConnectionType::ClusterServer,
56        }
57    }
58}
59
60#[derive(Clone, Copy, Eq, PartialEq)]
61pub enum ConnectionType {
62    MasterServer,
63    ClusterServer,
64    None,
65}
66
67impl std::fmt::Display for ConnectionType {
68    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69        match self {
70            ConnectionType::MasterServer => write!(f, "Master Server"),
71            ConnectionType::ClusterServer => write!(f, "Cluster Server"),
72            ConnectionType::None => write!(f, "Unknown"),
73        }
74    }
75}
76
77pub fn get_ip(ip: &str) -> IpAddr {
78    IpAddr::from_str(ip).unwrap_or(
79        IpAddr::from_str(DEFAULT_IP).unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST))
80    )
81}
82
83pub async fn cleanup() {}
84
85pub async fn start<P>(plugin: P) where P: ClientPlugin + Send + Sync + 'static {
86    // Get the connection LOGGER.information.
87    let connection = *CONNECTION.read().await;
88    if connection.is_none() {
89        return;
90    }
91    let connection = connection.unwrap();
92
93    let ip = connection.ip;
94    let port = connection.port;
95    let connection_type = connection.connection_type;
96    {
97        *CONNECTION.write().await = None;
98    }
99
100    let (tx, mut rx) = mpsc::channel::<Box<[u8]>>(10);
101    plugin.set_sender(tx.clone());
102
103    let handler = tokio::spawn(async move {
104        LOGGER.warning(format!("Connecting to the {connection_type}...").as_str());
105        let mut stream = TcpStream::connect(format!("{}:{}", ip, port)).await.expect(
106            format!("Failed to connect to the {connection_type} at {ip}:{port}.").as_str()
107        );
108        LOGGER.success(format!("Connected to the {connection_type} at {ip}:{port}.").as_str());
109
110        let (reader, mut writer) = stream.split();
111        let mut reader = BufReader::new(reader);
112
113        lselect! {
114            command = reader.read_u8() => {
115                if command.is_err() {
116                    continue;
117                }
118
119                LOGGER.info(format!("Received data: {:?}", command).as_str());
120
121                match connection_type {
122                    ConnectionType::MasterServer => match command.unwrap() {
123                        x if x == ToUnknown::SendClusters as u8 => {
124                            let amount = match reader.read_u8().await {
125                                Ok(amount) => amount,
126                                Err(_) => {
127                                    LOGGER.error("Failed to read the amount of clusters.");
128                                    continue;
129                                }
130                            };
131
132                            let mut cluster_servers_tmp = Vec::new();
133                            for _ in 0..amount {
134                                let name = lread_string!(reader, |msg| LOGGER.error(msg), "cluster name");
135                                let ip = lread_string!(reader, |msg| LOGGER.error(msg), "cluster IP");
136                                let port = match reader.read_u16().await {
137                                    Ok(port) => port,
138                                    Err(_) => {
139                                        LOGGER.error("Failed to read the cluster port.");
140                                        continue;
141                                    }
142                                };
143                                let max_connections = match reader.read_u32().await {
144                                    Ok(max_connections) => max_connections,
145                                    Err(_) => {
146                                        LOGGER.error("Failed to read the cluster max connections.");
147                                        continue;
148                                    }
149                                };
150
151                                cluster_servers_tmp.push(ClusterInfo {
152                                    name,
153                                    ip,
154                                    port,
155                                    max_connections,
156                                });
157                            }
158
159                            {
160                                {
161                                    let mut cluster_servers = CLUSTER_SERVERS.write().await;
162                                    *cluster_servers = cluster_servers_tmp;
163
164                                    LOGGER.success(format!("Received {amount} Cluster servers from the {connection_type}.").as_str());
165                                    println!("{:?}", *cluster_servers);
166                                }
167                            }
168                        },
169                        cmd => plugin.receive_master(tx.clone(), cmd, &mut reader).await,
170                    }
171                    ConnectionType::ClusterServer => match command.unwrap() {
172                        x if x == ToClient::SendClusters as u8 => {
173                            let amount = match reader.read_u8().await {
174                                Ok(amount) => amount,
175                                Err(_) => {
176                                    LOGGER.error("Failed to read the amount of clusters.");
177                                    continue;
178                                }
179                            };
180
181                            let mut cluster_servers_tmp = Vec::new();
182                            for _ in 0..amount {
183                                let name = lread_string!(reader, |msg| LOGGER.error(msg), "cluster name");
184                                let ip = lread_string!(reader, |msg| LOGGER.error(msg), "cluster IP");
185                                let port = match reader.read_u16().await {
186                                    Ok(port) => port,
187                                    Err(_) => {
188                                        LOGGER.error("Failed to read the cluster port.");
189                                        continue;
190                                    }
191                                };
192                                let max_connections = match reader.read_u32().await {
193                                    Ok(max_connections) => max_connections,
194                                    Err(_) => {
195                                        LOGGER.error("Failed to read the cluster max connections.");
196                                        continue;
197                                    }
198                                };
199
200                                cluster_servers_tmp.push(ClusterInfo {
201                                    name,
202                                    ip,
203                                    port,
204                                    max_connections,
205                                });
206                            }
207
208                            {
209                                {
210                                    let mut cluster_servers = CLUSTER_SERVERS.write().await;
211                                    *cluster_servers = cluster_servers_tmp;
212
213                                    LOGGER.success(format!("Received {amount} Cluster servers from the {connection_type}.").as_str());
214                                    println!("{:?}", *cluster_servers);
215                                }
216                            }
217                        },
218                        x if x == ToClient::DisconnectCluster as u8 => todo!(),
219                        x if x == ToClient::LeaveCluster as u8 => todo!(),
220
221                        x if x == ToClient::VersionOfKey as u8 => todo!(),
222                        x if x == ToClient::SendPubKey as u8 => todo!(),
223                        x if x == ToClient::Authenticate as u8 => todo!(),
224
225                        x if x == ToClient::Move as u8 => todo!(),
226                        cmd => plugin.receive_cluster(tx.clone(), cmd, &mut reader).await,
227                    }
228                    _ => (),
229                }
230            }
231            result = rx.recv() => {
232                if let Some(data) = result {
233                    if data.is_empty() {
234                        writer.shutdown().await.expect("Failed to shutdown the writer.");
235                        LOGGER.info("Closing connection...");
236                        break;
237                    }
238
239                    writer.write_all(&data).await.expect("Failed to write to the Server.");
240                    writer.flush().await.expect("Failed to flush the writer.");
241                    LOGGER.info(format!("Sent {data:?} as data to the {connection_type}.").as_str());
242                } else {
243                    writer.shutdown().await.expect("Failed to shutdown the writer.");
244                    LOGGER.info("Shutting down connection...");
245                    break;
246                }
247            }
248        }
249    });
250
251    let _ = handler.await;
252}
253
254pub async fn send_data(tx: &Sender<Box<[u8]>>, data: Box<[u8]>) {
255    tx.send(data).await.expect("Failed to send data to the Server.");
256}
257
258pub async fn join_cluster(tx: &Sender<Box<[u8]>>, id: usize) {
259    if id < (0 as usize) {
260        LOGGER.error("Failed to join a cluster. The cluster ID is invalid (less than 0).");
261        return;
262    }
263
264    let cluster_servers = CLUSTER_SERVERS.read().await;
265    if cluster_servers.is_empty() {
266        LOGGER.error("Failed to join a cluster. No cluster servers are available.");
267        return;
268    }
269
270    if id >= cluster_servers.len() {
271        LOGGER.error(
272            "Failed to join a cluster. The cluster ID is invalid (greater than the amount of clusters)."
273        );
274        return;
275    }
276
277    let cluster = (
278        match cluster_servers.get(id) {
279            Some(cluster) => cluster,
280            None => {
281                LOGGER.error("Failed to join a cluster. The cluster ID is invalid.");
282                return;
283            }
284        }
285    ).clone();
286
287    LOGGER.success(format!("Client is joining cluster {}", cluster.name).as_str());
288
289    let connection = match std::panic::catch_unwind(|| Connection::from(cluster)) {
290        Ok(connection) => connection,
291        Err(_) => {
292            LOGGER.error("Failed to create a connection with the Cluster Server.");
293            return;
294        }
295    };
296    {
297        // Overwrite the current connection with the cluster connection.
298        *CONNECTION.write().await = Some(connection);
299        stop(tx).await;
300    }
301}
302
303async fn stop(tx: &Sender<Box<[u8]>>) {
304    tx.send(Box::new([])).await.expect("Failed to shutdown.");
305}