1use sustenet_shared as shared;
2
3use std::net::{ IpAddr, Ipv4Addr };
4use std::str::FromStr;
5use std::sync::{ Arc, LazyLock };
6
7use tokio::io::{ AsyncReadExt, AsyncWriteExt, BufReader };
8use tokio::net::TcpStream;
9use tokio::sync::mpsc::Sender;
10use tokio::sync::{ RwLock, mpsc };
11
12use sustenet_shared::ClientPlugin;
13use shared::logging::{ LogType, Logger };
14use shared::packets::cluster::ToClient;
15use shared::packets::master::ToUnknown;
16use shared::utils::constants::{ DEFAULT_IP, MASTER_PORT };
17use shared::{ lread_string, lselect };
18
19lazy_static::lazy_static! {
20 pub static ref CLUSTER_SERVERS: Arc<RwLock<Vec<ClusterInfo>>> = Arc::new(
21 RwLock::new(Vec::new())
22 );
23 pub static ref CONNECTION: Arc<RwLock<Option<Connection>>> = Arc::new(
24 RwLock::new(
25 Some(Connection {
26 ip: get_ip(DEFAULT_IP),
27 port: MASTER_PORT,
28 connection_type: ConnectionType::MasterServer,
29 })
30 )
31 );
32}
33pub static LOGGER: LazyLock<Logger> = LazyLock::new(|| Logger::new(LogType::Cluster));
34
35#[derive(Debug, Clone)]
36pub struct ClusterInfo {
37 pub name: String,
38 pub ip: String,
39 pub port: u16,
40 pub max_connections: u32,
41}
42
43#[derive(Clone, Copy)]
44pub struct Connection {
45 pub ip: IpAddr,
46 pub port: u16,
47 pub connection_type: ConnectionType,
48}
49
50impl From<ClusterInfo> for Connection {
51 fn from(info: ClusterInfo) -> Self {
52 Connection {
53 ip: IpAddr::from_str(info.ip.as_str()).expect("Failed to parse the IP."),
54 port: info.port,
55 connection_type: ConnectionType::ClusterServer,
56 }
57 }
58}
59
60#[derive(Clone, Copy, Eq, PartialEq)]
61pub enum ConnectionType {
62 MasterServer,
63 ClusterServer,
64 None,
65}
66
67impl std::fmt::Display for ConnectionType {
68 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
69 match self {
70 ConnectionType::MasterServer => write!(f, "Master Server"),
71 ConnectionType::ClusterServer => write!(f, "Cluster Server"),
72 ConnectionType::None => write!(f, "Unknown"),
73 }
74 }
75}
76
77pub fn get_ip(ip: &str) -> IpAddr {
78 IpAddr::from_str(ip).unwrap_or(
79 IpAddr::from_str(DEFAULT_IP).unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST))
80 )
81}
82
83pub async fn cleanup() {}
84
85pub async fn start<P>(plugin: P) where P: ClientPlugin + Send + Sync + 'static {
86 let connection = *CONNECTION.read().await;
88 if connection.is_none() {
89 return;
90 }
91 let connection = connection.unwrap();
92
93 let ip = connection.ip;
94 let port = connection.port;
95 let connection_type = connection.connection_type;
96 {
97 *CONNECTION.write().await = None;
98 }
99
100 let (tx, mut rx) = mpsc::channel::<Box<[u8]>>(10);
101 plugin.set_sender(tx.clone());
102
103 let handler = tokio::spawn(async move {
104 LOGGER.warning(format!("Connecting to the {connection_type}...").as_str());
105 let mut stream = TcpStream::connect(format!("{}:{}", ip, port)).await.expect(
106 format!("Failed to connect to the {connection_type} at {ip}:{port}.").as_str()
107 );
108 LOGGER.success(format!("Connected to the {connection_type} at {ip}:{port}.").as_str());
109
110 let (reader, mut writer) = stream.split();
111 let mut reader = BufReader::new(reader);
112
113 lselect! {
114 command = reader.read_u8() => {
115 if command.is_err() {
116 continue;
117 }
118
119 LOGGER.info(format!("Received data: {:?}", command).as_str());
120
121 match connection_type {
122 ConnectionType::MasterServer => match command.unwrap() {
123 x if x == ToUnknown::SendClusters as u8 => {
124 let amount = match reader.read_u8().await {
125 Ok(amount) => amount,
126 Err(_) => {
127 LOGGER.error("Failed to read the amount of clusters.");
128 continue;
129 }
130 };
131
132 let mut cluster_servers_tmp = Vec::new();
133 for _ in 0..amount {
134 let name = lread_string!(reader, |msg| LOGGER.error(msg), "cluster name");
135 let ip = lread_string!(reader, |msg| LOGGER.error(msg), "cluster IP");
136 let port = match reader.read_u16().await {
137 Ok(port) => port,
138 Err(_) => {
139 LOGGER.error("Failed to read the cluster port.");
140 continue;
141 }
142 };
143 let max_connections = match reader.read_u32().await {
144 Ok(max_connections) => max_connections,
145 Err(_) => {
146 LOGGER.error("Failed to read the cluster max connections.");
147 continue;
148 }
149 };
150
151 cluster_servers_tmp.push(ClusterInfo {
152 name,
153 ip,
154 port,
155 max_connections,
156 });
157 }
158
159 {
160 {
161 let mut cluster_servers = CLUSTER_SERVERS.write().await;
162 *cluster_servers = cluster_servers_tmp;
163
164 LOGGER.success(format!("Received {amount} Cluster servers from the {connection_type}.").as_str());
165 println!("{:?}", *cluster_servers);
166 }
167 }
168 },
169 cmd => plugin.receive_master(tx.clone(), cmd, &mut reader).await,
170 }
171 ConnectionType::ClusterServer => match command.unwrap() {
172 x if x == ToClient::SendClusters as u8 => {
173 let amount = match reader.read_u8().await {
174 Ok(amount) => amount,
175 Err(_) => {
176 LOGGER.error("Failed to read the amount of clusters.");
177 continue;
178 }
179 };
180
181 let mut cluster_servers_tmp = Vec::new();
182 for _ in 0..amount {
183 let name = lread_string!(reader, |msg| LOGGER.error(msg), "cluster name");
184 let ip = lread_string!(reader, |msg| LOGGER.error(msg), "cluster IP");
185 let port = match reader.read_u16().await {
186 Ok(port) => port,
187 Err(_) => {
188 LOGGER.error("Failed to read the cluster port.");
189 continue;
190 }
191 };
192 let max_connections = match reader.read_u32().await {
193 Ok(max_connections) => max_connections,
194 Err(_) => {
195 LOGGER.error("Failed to read the cluster max connections.");
196 continue;
197 }
198 };
199
200 cluster_servers_tmp.push(ClusterInfo {
201 name,
202 ip,
203 port,
204 max_connections,
205 });
206 }
207
208 {
209 {
210 let mut cluster_servers = CLUSTER_SERVERS.write().await;
211 *cluster_servers = cluster_servers_tmp;
212
213 LOGGER.success(format!("Received {amount} Cluster servers from the {connection_type}.").as_str());
214 println!("{:?}", *cluster_servers);
215 }
216 }
217 },
218 x if x == ToClient::DisconnectCluster as u8 => todo!(),
219 x if x == ToClient::LeaveCluster as u8 => todo!(),
220
221 x if x == ToClient::VersionOfKey as u8 => todo!(),
222 x if x == ToClient::SendPubKey as u8 => todo!(),
223 x if x == ToClient::Authenticate as u8 => todo!(),
224
225 x if x == ToClient::Move as u8 => todo!(),
226 cmd => plugin.receive_cluster(tx.clone(), cmd, &mut reader).await,
227 }
228 _ => (),
229 }
230 }
231 result = rx.recv() => {
232 if let Some(data) = result {
233 if data.is_empty() {
234 writer.shutdown().await.expect("Failed to shutdown the writer.");
235 LOGGER.info("Closing connection...");
236 break;
237 }
238
239 writer.write_all(&data).await.expect("Failed to write to the Server.");
240 writer.flush().await.expect("Failed to flush the writer.");
241 LOGGER.info(format!("Sent {data:?} as data to the {connection_type}.").as_str());
242 } else {
243 writer.shutdown().await.expect("Failed to shutdown the writer.");
244 LOGGER.info("Shutting down connection...");
245 break;
246 }
247 }
248 }
249 });
250
251 let _ = handler.await;
252}
253
254pub async fn send_data(tx: &Sender<Box<[u8]>>, data: Box<[u8]>) {
255 tx.send(data).await.expect("Failed to send data to the Server.");
256}
257
258pub async fn join_cluster(tx: &Sender<Box<[u8]>>, id: usize) {
259 if id < (0 as usize) {
260 LOGGER.error("Failed to join a cluster. The cluster ID is invalid (less than 0).");
261 return;
262 }
263
264 let cluster_servers = CLUSTER_SERVERS.read().await;
265 if cluster_servers.is_empty() {
266 LOGGER.error("Failed to join a cluster. No cluster servers are available.");
267 return;
268 }
269
270 if id >= cluster_servers.len() {
271 LOGGER.error(
272 "Failed to join a cluster. The cluster ID is invalid (greater than the amount of clusters)."
273 );
274 return;
275 }
276
277 let cluster = (
278 match cluster_servers.get(id) {
279 Some(cluster) => cluster,
280 None => {
281 LOGGER.error("Failed to join a cluster. The cluster ID is invalid.");
282 return;
283 }
284 }
285 ).clone();
286
287 LOGGER.success(format!("Client is joining cluster {}", cluster.name).as_str());
288
289 let connection = match std::panic::catch_unwind(|| Connection::from(cluster)) {
290 Ok(connection) => connection,
291 Err(_) => {
292 LOGGER.error("Failed to create a connection with the Cluster Server.");
293 return;
294 }
295 };
296 {
297 *CONNECTION.write().await = Some(connection);
299 stop(tx).await;
300 }
301}
302
303async fn stop(tx: &Sender<Box<[u8]>>) {
304 tx.send(Box::new([])).await.expect("Failed to shutdown.");
305}