1use crate::error::McError;
2use crate::models::*;
3use dashmap::DashMap;
4use once_cell::sync::Lazy;
5use std::net::{SocketAddr, ToSocketAddrs};
6use std::time::{Duration, SystemTime};
7use tokio::io::{AsyncReadExt, AsyncWriteExt};
8use tokio::net::{TcpStream, UdpSocket};
9use tokio::time::timeout;
10
11static DNS_CACHE: Lazy<DashMap<String, SocketAddr>> = Lazy::new(DashMap::new);
12
13#[derive(Clone)]
14pub struct McClient {
15 timeout: Duration,
16 max_parallel: usize,
17}
18
19impl Default for McClient {
20 fn default() -> Self {
21 Self {
22 timeout: Duration::from_secs(10),
23 max_parallel: 10,
24 }
25 }
26}
27
28impl McClient {
29 pub fn new() -> Self {
30 Self::default()
31 }
32
33 pub fn with_timeout(mut self, timeout: Duration) -> Self {
34 self.timeout = timeout;
35 self
36 }
37
38 pub fn with_max_parallel(mut self, max_parallel: usize) -> Self {
39 self.max_parallel = max_parallel;
40 self
41 }
42
43 pub async fn ping(&self, address: &str, edition: ServerEdition) -> Result<ServerStatus, McError> {
44 match edition {
45 ServerEdition::Java => self.ping_java(address).await,
46 ServerEdition::Bedrock => self.ping_bedrock(address).await,
47 }
48 }
49
50 pub async fn ping_java(&self, address: &str) -> Result<ServerStatus, McError> {
51 let start = SystemTime::now();
52 let (host, port_str) = address.split_once(':').unwrap_or((address, "25565"));
53 let port = port_str.parse::<u16>().map_err(|e| McError::InvalidPort(e.to_string()))?;
54
55 let resolved = self.resolve_dns(host, port).await?;
56
57 let mut stream = timeout(self.timeout, TcpStream::connect(resolved))
58 .await
59 .map_err(|_| McError::Timeout)?
60 .map_err(|e| McError::ConnectionError(e.to_string()))?;
61
62 stream.set_nodelay(true).map_err(|e| McError::IoError(e))?;
63
64 let mut handshake = Vec::with_capacity(64);
66 write_var_int(&mut handshake, 0x00);
67 write_var_int(&mut handshake, 47);
68 write_string(&mut handshake, host);
69 handshake.extend_from_slice(&port.to_be_bytes());
70 write_var_int(&mut handshake, 1);
71
72 let mut packet = Vec::with_capacity(handshake.len() + 5);
73 write_var_int(&mut packet, handshake.len() as i32);
74 packet.extend_from_slice(&handshake);
75
76 timeout(self.timeout, stream.write_all(&packet))
77 .await
78 .map_err(|_| McError::Timeout)?
79 .map_err(|e| McError::IoError(e))?;
80
81 let mut status_request = Vec::with_capacity(5);
83 write_var_int(&mut status_request, 0x00);
84
85 let mut status_packet = Vec::with_capacity(status_request.len() + 5);
86 write_var_int(&mut status_packet, status_request.len() as i32);
87 status_packet.extend_from_slice(&status_request);
88
89 timeout(self.timeout, stream.write_all(&status_packet))
90 .await
91 .map_err(|_| McError::Timeout)?
92 .map_err(|e| McError::IoError(e))?;
93
94 let mut response = Vec::with_capacity(1024);
96 let mut buf = [0u8; 4096];
97
98 loop {
99 let read_result = timeout(self.timeout, stream.read(&mut buf)).await;
100
101 match read_result {
102 Ok(Ok(0)) => break,
103 Ok(Ok(n)) => {
104 response.extend_from_slice(&buf[..n]);
105
106 if response.len() >= 5 {
107 let mut cursor = std::io::Cursor::new(&response);
108 if let Ok(packet_length) = read_var_int(&mut cursor) {
109 let total_length = cursor.position() as usize + packet_length as usize;
110 if response.len() >= total_length {
111 break;
112 }
113 }
114 }
115 }
116 Ok(Err(e)) => return Err(McError::IoError(e)),
117 Err(_) => return Err(McError::Timeout),
118 }
119 }
120
121 if response.is_empty() {
122 return Err(McError::InvalidResponse("No response from server".to_string()));
123 }
124
125 let mut cursor = std::io::Cursor::new(&response);
126 let packet_length = read_var_int(&mut cursor).map_err(|e| McError::InvalidResponse(e))?;
127
128 let total_expected = cursor.position() as usize + packet_length as usize;
129 if response.len() < total_expected {
130 return Err(McError::InvalidResponse(format!("Incomplete packet: expected {}, got {}", total_expected, response.len())));
131 }
132
133 let packet_id = read_var_int(&mut cursor).map_err(|e| McError::InvalidResponse(e))?;
134 if packet_id != 0x00 {
135 return Err(McError::InvalidResponse(format!("Unexpected packet ID: {}", packet_id)));
136 }
137
138 let json_length = read_var_int(&mut cursor).map_err(|e| McError::InvalidResponse(e))?;
139 if cursor.position() as usize + json_length as usize > response.len() {
140 return Err(McError::InvalidResponse("JSON data truncated".to_string()));
141 }
142
143 let json_buf = &response[cursor.position() as usize..cursor.position() as usize + json_length as usize];
144 let json_str = String::from_utf8(json_buf.to_vec()).map_err(|e| McError::Utf8Error(e))?;
145 let json: serde_json::Value = serde_json::from_str(&json_str).map_err(|e| McError::JsonError(e))?;
146
147 let latency = start.elapsed().map_err(|_| McError::InvalidResponse("Time error".to_string()))?.as_secs_f64() * 1000.0;
148
149 let version = JavaVersion {
151 name: json["version"]["name"].as_str().unwrap_or("Unknown").to_string(),
152 protocol: json["version"]["protocol"].as_i64().unwrap_or(0),
153 };
154
155 let players = JavaPlayers {
156 online: json["players"]["online"].as_i64().unwrap_or(0),
157 max: json["players"]["max"].as_i64().unwrap_or(0),
158 sample: if let Some(sample) = json["players"]["sample"].as_array() {
159 Some(sample.iter().filter_map(|p| {
160 Some(JavaPlayer {
161 name: p["name"].as_str()?.to_string(),
162 id: p["id"].as_str()?.to_string(),
163 })
164 }).collect())
165 } else {
166 None
167 },
168 };
169
170 let description = if let Some(desc) = json["description"].as_str() {
171 desc.to_string()
172 } else if let Some(text) = json["description"]["text"].as_str() {
173 text.to_string()
174 } else {
175 "No description".to_string()
176 };
177
178 let favicon = json["favicon"].as_str().map(|s| s.to_string());
179
180 Ok(ServerStatus {
181 online: true,
182 latency,
183 data: ServerData::Java(JavaStatus {
184 version,
185 players,
186 description,
187 favicon,
188 raw_data: json,
189 }),
190 })
191 }
192
193 pub async fn ping_bedrock(&self, address: &str) -> Result<ServerStatus, McError> {
194 let start = SystemTime::now();
195 let (host, port_str) = address.split_once(':').unwrap_or((address, "19132"));
196 let port = port_str.parse::<u16>().map_err(|e| McError::InvalidPort(e.to_string()))?;
197
198 let resolved = self.resolve_dns(host, port).await?;
199
200 let socket = UdpSocket::bind("0.0.0.0:0").await.map_err(|e| McError::IoError(e))?;
201
202 let mut ping_packet = Vec::with_capacity(35);
203 ping_packet.push(0x01);
204 ping_packet.extend_from_slice(&(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
205 .unwrap()
206 .as_millis() as u64)
207 .to_be_bytes());
208 ping_packet.extend_from_slice(&[0x00, 0xFF, 0xFF, 0x00, 0xFE, 0xFE, 0xFE, 0xFE, 0xFD, 0xFD, 0xFD, 0xFD, 0x12, 0x34, 0x56, 0x78]);
209 ping_packet.extend_from_slice(&[0x00; 8]);
210
211 timeout(self.timeout, socket.send_to(&ping_packet, resolved))
212 .await
213 .map_err(|_| McError::Timeout)?
214 .map_err(|e| McError::IoError(e))?;
215
216 let mut buf = [0u8; 1024];
217 let (len, _) = timeout(self.timeout, socket.recv_from(&mut buf))
218 .await
219 .map_err(|_| McError::Timeout)?
220 .map_err(|e| McError::IoError(e))?;
221
222 let latency = start.elapsed().map_err(|_| McError::InvalidResponse("Time error".to_string()))?.as_secs_f64() * 1000.0;
223
224 if len < 35 {
225 return Err(McError::InvalidResponse("Response too short".to_string()));
226 }
227
228 let pong_data = String::from_utf8_lossy(&buf[35..len]).to_string();
229 let parts: Vec<&str> = pong_data.split(';').collect();
230
231 if parts.len() < 6 {
232 return Err(McError::InvalidResponse("Invalid Bedrock response".to_string()));
233 }
234
235 let status = BedrockStatus {
236 edition: parts[0].to_string(),
237 motd: parts[1].to_string(),
238 protocol_version: parts[2].to_string(),
239 version: parts[3].to_string(),
240 online_players: parts[4].to_string(),
241 max_players: parts[5].to_string(),
242 server_uid: parts.get(6).map_or("", |s| *s).to_string(),
243 motd2: parts.get(7).map_or("", |s| *s).to_string(),
244 game_mode: parts.get(8).map_or("", |s| *s).to_string(),
245 game_mode_numeric: parts.get(9).map_or("", |s| *s).to_string(),
246 port_ipv4: parts.get(10).map_or("", |s| *s).to_string(),
247 port_ipv6: parts.get(11).map_or("", |s| *s).to_string(),
248 raw_data: pong_data,
249 };
250
251 Ok(ServerStatus {
252 online: true,
253 latency,
254 data: ServerData::Bedrock(status),
255 })
256 }
257
258 pub async fn ping_many(&self, servers: &[ServerInfo]) -> Vec<(ServerInfo, Result<ServerStatus, McError>)> {
259 use futures::stream::StreamExt;
260 use tokio::sync::Semaphore;
261
262 let semaphore = std::sync::Arc::new(Semaphore::new(self.max_parallel));
263 let client = self.clone();
264
265 let futures = servers.iter().map(|server| {
266 let server = server.clone();
267 let semaphore = semaphore.clone();
268 let client = client.clone();
269
270 async move {
271 let _permit = semaphore.acquire().await;
272 let result = client.ping(&server.address, server.edition).await;
273 (server, result)
274 }
275 });
276
277 let mut results = Vec::new();
278 let mut stream = futures::stream::iter(futures).buffer_unordered(self.max_parallel);
279
280 while let Some(result) = stream.next().await {
281 results.push(result);
282 }
283
284 results
285 }
286
287 async fn resolve_dns(&self, host: &str, port: u16) -> Result<SocketAddr, McError> {
288 let cache_key = format!("{}:{}", host, port);
289
290 if let Some(addr) = DNS_CACHE.get(&cache_key) {
291 return Ok(*addr);
292 }
293
294 let addrs: Vec<SocketAddr> = format!("{}:{}", host, port)
295 .to_socket_addrs()
296 .map_err(|e| McError::DnsError(e.to_string()))?
297 .collect();
298
299 if addrs.is_empty() {
300 return Err(McError::DnsError("No addresses resolved".to_string()));
301 }
302
303 let addr = addrs.iter()
305 .find(|a| a.is_ipv4())
306 .or_else(|| addrs.first())
307 .copied()
308 .unwrap();
309
310 DNS_CACHE.insert(cache_key, addr);
311 Ok(addr)
312 }
313}
314
315fn write_var_int(buffer: &mut Vec<u8>, value: i32) {
317 let mut value = value as u32;
318 loop {
319 let mut temp = (value & 0x7F) as u8;
320 value >>= 7;
321 if value != 0 {
322 temp |= 0x80;
323 }
324 buffer.push(temp);
325 if value == 0 {
326 break;
327 }
328 }
329}
330
331fn write_string(buffer: &mut Vec<u8>, s: &str) {
332 write_var_int(buffer, s.len() as i32);
333 buffer.extend_from_slice(s.as_bytes());
334}
335
336fn read_var_int(reader: &mut impl std::io::Read) -> Result<i32, String> {
337 let mut result = 0i32;
338 let mut shift = 0;
339 loop {
340 let mut byte = [0u8];
341 reader.read_exact(&mut byte).map_err(|e| e.to_string())?;
342 let value = byte[0] as i32;
343 result |= (value & 0x7F) << shift;
344 shift += 7;
345 if shift > 35 {
346 return Err("VarInt too big".to_string());
347 }
348 if (value & 0x80) == 0 {
349 break;
350 }
351 }
352 Ok(result)
353}