1use std::io::Cursor;
2use std::net::{SocketAddr, ToSocketAddrs};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use once_cell::sync::Lazy;
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::net::{TcpStream, UdpSocket};
9use tokio::time::timeout;
10
11use crate::error::McError;
12use crate::models::*;
13
14static DNS_CACHE: Lazy<DashMap<String, (SocketAddr, SystemTime)>> = Lazy::new(DashMap::new);
15const DNS_CACHE_TTL: u64 = 300; #[derive(Clone)]
18pub struct McClient {
19 timeout: Duration,
20 max_parallel: usize,
21}
22
23impl Default for McClient {
24 fn default() -> Self {
25 Self {
26 timeout: Duration::from_secs(10),
27 max_parallel: 10,
28 }
29 }
30}
31
32impl McClient {
33 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn with_timeout(mut self, timeout: Duration) -> Self {
38 self.timeout = timeout;
39 self
40 }
41
42 pub fn with_max_parallel(mut self, max_parallel: usize) -> Self {
43 self.max_parallel = max_parallel;
44 self
45 }
46
47 pub async fn ping(&self, address: &str, edition: ServerEdition) -> Result<ServerStatus, McError> {
48 match edition {
49 ServerEdition::Java => self.ping_java(address).await,
50 ServerEdition::Bedrock => self.ping_bedrock(address).await,
51 }
52 }
53
54 pub async fn ping_java(&self, address: &str) -> Result<ServerStatus, McError> {
55 let start = SystemTime::now();
56 let (host, port) = Self::parse_address(address, 25565)?;
57 let resolved = self.resolve_dns(host, port).await?;
58 let dns_info = self.get_dns_info(host).await.ok(); let mut stream = timeout(self.timeout, TcpStream::connect(resolved))
61 .await
62 .map_err(|_| McError::Timeout)?
63 .map_err(|e| McError::ConnectionError(e.to_string()))?;
64
65 stream.set_nodelay(true).map_err(McError::IoError)?;
66
67 self.send_handshake(&mut stream, host, port).await?;
69
70 self.send_status_request(&mut stream).await?;
72
73 let response = self.read_response(&mut stream).await?;
75 let (json, latency) = self.parse_java_response(response, start)?;
76
77 Ok(ServerStatus {
79 online: true,
80 ip: resolved.ip().to_string(),
81 port: resolved.port(),
82 hostname: host.to_string(),
83 latency,
84 dns: dns_info,
85 data: ServerData::Java(self.parse_java_json(&json)?),
86 })
87 }
88
89 pub async fn ping_bedrock(&self, address: &str) -> Result<ServerStatus, McError> {
90 let start = SystemTime::now();
91 let (host, port) = Self::parse_address(address, 19132)?;
92 let resolved = self.resolve_dns(host, port).await?;
93 let dns_info = self.get_dns_info(host).await.ok(); let socket = UdpSocket::bind("0.0.0.0:0").await.map_err(McError::IoError)?;
96
97 let ping_packet = self.create_bedrock_ping_packet();
99 timeout(self.timeout, socket.send_to(&ping_packet, resolved))
100 .await
101 .map_err(|_| McError::Timeout)?
102 .map_err(|e| McError::IoError(e))?;
103
104 let mut buf = [0u8; 1024];
106 let (len, _) = timeout(self.timeout, socket.recv_from(&mut buf))
107 .await
108 .map_err(|_| McError::Timeout)?
109 .map_err(|e| McError::IoError(e))?;
110
111 if len < 35 {
112 return Err(McError::InvalidResponse("Response too short".to_string()));
113 }
114
115 let latency = start.elapsed()
116 .map_err(|_| McError::InvalidResponse("Time error".to_string()))?
117 .as_secs_f64() * 1000.0;
118
119 let pong_data = String::from_utf8_lossy(&buf[35..len]).to_string();
120
121 Ok(ServerStatus {
122 online: true,
123 ip: resolved.ip().to_string(),
124 port: resolved.port(),
125 hostname: host.to_string(),
126 latency,
127 dns: dns_info,
128 data: ServerData::Bedrock(self.parse_bedrock_response(&pong_data)?),
129 })
130 }
131
132 pub async fn ping_many(&self, servers: &[ServerInfo]) -> Vec<(ServerInfo, Result<ServerStatus, McError>)> {
133 use futures::stream::StreamExt;
134 use tokio::sync::Semaphore;
135
136 let semaphore = std::sync::Arc::new(Semaphore::new(self.max_parallel));
137 let client = self.clone();
138
139 let futures = servers.iter().map(|server| {
140 let server = server.clone();
141 let semaphore = semaphore.clone();
142 let client = client.clone();
143
144 async move {
145 let _permit = semaphore.acquire().await;
146 let result = client.ping(&server.address, server.edition).await;
147 (server, result)
148 }
149 });
150
151 futures::stream::iter(futures)
152 .buffer_unordered(self.max_parallel)
153 .collect()
154 .await
155 }
156
157 fn parse_address(address: &str, default_port: u16) -> Result<(&str, u16), McError> {
159 if let Some((host, port_str)) = address.split_once(':') {
160 let port = port_str.parse::<u16>()
161 .map_err(|e| McError::InvalidPort(e.to_string()))?;
162 Ok((host, port))
163 } else {
164 Ok((address, default_port))
165 }
166 }
167
168 async fn resolve_dns(&self, host: &str, port: u16) -> Result<SocketAddr, McError> {
169 let cache_key = format!("{}:{}", host, port);
170
171 if let Some(entry) = DNS_CACHE.get(&cache_key) {
173 let (addr, timestamp) = *entry.value();
174 if timestamp.elapsed().map(|d| d.as_secs() < DNS_CACHE_TTL).unwrap_or(false) {
175 return Ok(addr);
176 }
177 }
178
179 let addrs: Vec<SocketAddr> = format!("{}:{}", host, port)
181 .to_socket_addrs()
182 .map_err(|e| McError::DnsError(e.to_string()))?
183 .collect();
184
185 let addr = addrs.iter()
186 .find(|a| a.is_ipv4())
187 .or_else(|| addrs.first())
188 .copied()
189 .ok_or_else(|| McError::DnsError("No addresses resolved".to_string()))?;
190
191 DNS_CACHE.insert(cache_key, (addr, SystemTime::now()));
192 Ok(addr)
193 }
194
195 async fn get_dns_info(&self, host: &str) -> Result<DnsInfo, McError> {
196 let addrs: Vec<SocketAddr> = format!("{}:0", host)
198 .to_socket_addrs()
199 .map_err(|e| McError::DnsError(e.to_string()))?
200 .collect();
201
202 Ok(DnsInfo {
203 a_records: addrs.iter().map(|a| a.ip().to_string()).collect(),
204 cname: None, ttl: 300,
206 })
207 }
208
209 async fn send_handshake(&self, stream: &mut TcpStream, host: &str, port: u16) -> Result<(), McError> {
210 let mut handshake = Vec::with_capacity(64);
211 write_var_int(&mut handshake, 0x00);
212 write_var_int(&mut handshake, 47);
213 write_string(&mut handshake, host);
214 handshake.extend_from_slice(&port.to_be_bytes());
215 write_var_int(&mut handshake, 1);
216
217 let mut packet = Vec::with_capacity(handshake.len() + 5);
218 write_var_int(&mut packet, handshake.len() as i32);
219 packet.extend_from_slice(&handshake);
220
221 timeout(self.timeout, stream.write_all(&packet))
222 .await
223 .map_err(|_| McError::Timeout)?
224 .map_err(McError::IoError)
225 }
226
227 async fn send_status_request(&self, stream: &mut TcpStream) -> Result<(), McError> {
228 let mut status_request = Vec::with_capacity(5);
229 write_var_int(&mut status_request, 0x00);
230
231 let mut status_packet = Vec::with_capacity(status_request.len() + 5);
232 write_var_int(&mut status_packet, status_request.len() as i32);
233 status_packet.extend_from_slice(&status_request);
234
235 timeout(self.timeout, stream.write_all(&status_packet))
236 .await
237 .map_err(|_| McError::Timeout)?
238 .map_err(McError::IoError)
239 }
240
241 async fn read_response(&self, stream: &mut TcpStream) -> Result<Vec<u8>, McError> {
242 let mut response = Vec::with_capacity(1024);
243 let mut buf = [0u8; 4096];
244 let mut expected_length = None;
245
246 loop {
247 let n = timeout(self.timeout, stream.read(&mut buf))
248 .await
249 .map_err(|_| McError::Timeout)?
250 .map_err(McError::IoError)?;
251
252 if n == 0 {
253 break;
254 }
255
256 response.extend_from_slice(&buf[..n]);
257
258 if expected_length.is_none() && response.len() >= 5 {
260 let mut cursor = Cursor::new(&response);
261 if let Ok(packet_length) = read_var_int(&mut cursor) {
262 expected_length = Some(cursor.position() as usize + packet_length as usize);
263 }
264 }
265
266 if let Some(expected) = expected_length {
267 if response.len() >= expected {
268 break;
269 }
270 }
271 }
272
273 if response.is_empty() {
274 return Err(McError::InvalidResponse("No response from server".to_string()));
275 }
276
277 Ok(response)
278 }
279
280 fn parse_java_response(&self, response: Vec<u8>, start: SystemTime) -> Result<(serde_json::Value, f64), McError> {
281 let mut cursor = Cursor::new(&response);
282 let packet_length = read_var_int(&mut cursor)
283 .map_err(|e| McError::InvalidResponse(format!("Failed to read packet length: {}", e)))?;
284
285 let total_expected = cursor.position() as usize + packet_length as usize;
286 if response.len() < total_expected {
287 return Err(McError::InvalidResponse(format!(
288 "Incomplete packet: expected {}, got {}",
289 total_expected,
290 response.len()
291 )));
292 }
293
294 let packet_id = read_var_int(&mut cursor)
295 .map_err(|e| McError::InvalidResponse(format!("Failed to read packet ID: {}", e)))?;
296
297 if packet_id != 0x00 {
298 return Err(McError::InvalidResponse(format!("Unexpected packet ID: {}", packet_id)));
299 }
300
301 let json_length = read_var_int(&mut cursor)
302 .map_err(|e| McError::InvalidResponse(format!("Failed to read JSON length: {}", e)))?;
303
304 if cursor.position() as usize + json_length as usize > response.len() {
305 return Err(McError::InvalidResponse("JSON data truncated".to_string()));
306 }
307
308 let json_buf = &response[cursor.position() as usize..cursor.position() as usize + json_length as usize];
309 let json_str = String::from_utf8(json_buf.to_vec())
310 .map_err(McError::Utf8Error)?;
311
312 let json: serde_json::Value = serde_json::from_str(&json_str)
313 .map_err(McError::JsonError)?;
314
315 let latency = start.elapsed()
316 .map_err(|_| McError::InvalidResponse("Time error".to_string()))?
317 .as_secs_f64() * 1000.0;
318
319 Ok((json, latency))
320 }
321
322 fn parse_java_json(&self, json: &serde_json::Value) -> Result<JavaStatus, McError> {
323 let version = JavaVersion {
324 name: json["version"]["name"].as_str().unwrap_or("Unknown").to_string(),
325 protocol: json["version"]["protocol"].as_i64().unwrap_or(0),
326 };
327
328 let players = JavaPlayers {
329 online: json["players"]["online"].as_i64().unwrap_or(0),
330 max: json["players"]["max"].as_i64().unwrap_or(0),
331 sample: if let Some(sample) = json["players"]["sample"].as_array() {
332 Some(sample.iter().filter_map(|p| {
333 Some(JavaPlayer {
334 name: p["name"].as_str()?.to_string(),
335 id: p["id"].as_str()?.to_string(),
336 })
337 }).collect())
338 } else {
339 None
340 },
341 };
342
343 let description = if let Some(desc) = json["description"].as_str() {
344 desc.to_string()
345 } else if let Some(text) = json["description"]["text"].as_str() {
346 text.to_string()
347 } else {
348 "No description".to_string()
349 };
350
351 let favicon = json["favicon"].as_str().map(|s| s.to_string());
352 let map = json["map"].as_str().map(|s| s.to_string());
353 let gamemode = json["gamemode"].as_str().map(|s| s.to_string());
354 let software = json["software"].as_str().map(|s| s.to_string());
355
356 let plugins = if let Some(plugins_array) = json["plugins"].as_array() {
357 Some(plugins_array.iter().filter_map(|p| {
358 Some(JavaPlugin {
359 name: p["name"].as_str()?.to_string(),
360 version: p["version"].as_str().map(|s| s.to_string()),
361 })
362 }).collect())
363 } else {
364 None
365 };
366
367 let mods = if let Some(mods_array) = json["mods"].as_array() {
368 Some(mods_array.iter().filter_map(|m| {
369 Some(JavaMod {
370 modid: m["modid"].as_str()?.to_string(),
371 version: m["version"].as_str().map(|s| s.to_string()),
372 })
373 }).collect())
374 } else {
375 None
376 };
377
378 Ok(JavaStatus {
379 version,
380 players,
381 description,
382 favicon,
383 map,
384 gamemode,
385 software,
386 plugins,
387 mods,
388 raw_data: json.clone(),
389 })
390 }
391
392 fn create_bedrock_ping_packet(&self) -> Vec<u8> {
393 let mut ping_packet = Vec::with_capacity(35);
394 ping_packet.push(0x01);
395 ping_packet.extend_from_slice(&(SystemTime::now()
396 .duration_since(SystemTime::UNIX_EPOCH)
397 .unwrap()
398 .as_millis() as u64)
399 .to_be_bytes());
400 ping_packet.extend_from_slice(&[0x00, 0xFF, 0xFF, 0x00, 0xFE, 0xFE, 0xFE, 0xFE, 0xFD, 0xFD, 0xFD, 0xFD, 0x12, 0x34, 0x56, 0x78]);
401 ping_packet.extend_from_slice(&[0x00; 8]);
402 ping_packet
403 }
404
405 fn parse_bedrock_response(&self, pong_data: &str) -> Result<BedrockStatus, McError> {
406 let parts: Vec<&str> = pong_data.split(';').collect();
407
408 if parts.len() < 6 {
409 return Err(McError::InvalidResponse("Invalid Bedrock response".to_string()));
410 }
411
412 Ok(BedrockStatus {
413 edition: parts[0].to_string(),
414 motd: parts[1].to_string(),
415 protocol_version: parts[2].to_string(),
416 version: parts[3].to_string(),
417 online_players: parts[4].to_string(),
418 max_players: parts[5].to_string(),
419 server_uid: parts.get(6).map_or("", |s| *s).to_string(),
420 motd2: parts.get(7).map_or("", |s| *s).to_string(),
421 game_mode: parts.get(8).map_or("", |s| *s).to_string(),
422 game_mode_numeric: parts.get(9).map_or("", |s| *s).to_string(),
423 port_ipv4: parts.get(10).map_or("", |s| *s).to_string(),
424 port_ipv6: parts.get(11).map_or("", |s| *s).to_string(),
425 map: parts.get(12).map(|s| s.to_string()),
426 software: parts.get(13).map(|s| s.to_string()),
427 raw_data: pong_data.to_string(),
428 })
429 }
430}
431
432fn write_var_int(buffer: &mut Vec<u8>, value: i32) {
434 let mut value = value as u32;
435 loop {
436 let mut temp = (value & 0x7F) as u8;
437 value >>= 7;
438 if value != 0 {
439 temp |= 0x80;
440 }
441 buffer.push(temp);
442 if value == 0 {
443 break;
444 }
445 }
446}
447
448fn write_string(buffer: &mut Vec<u8>, s: &str) {
449 write_var_int(buffer, s.len() as i32);
450 buffer.extend_from_slice(s.as_bytes());
451}
452
453fn read_var_int(reader: &mut impl std::io::Read) -> Result<i32, String> {
454 let mut result = 0i32;
455 let mut shift = 0;
456 loop {
457 let mut byte = [0u8];
458 reader.read_exact(&mut byte).map_err(|e| e.to_string())?;
459 let value = byte[0] as i32;
460 result |= (value & 0x7F) << shift;
461 shift += 7;
462 if shift > 35 {
463 return Err("VarInt too big".to_string());
464 }
465 if (value & 0x80) == 0 {
466 break;
467 }
468 }
469 Ok(result)
470}