use std::io::Cursor;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use dashmap::DashMap;
use once_cell::sync::Lazy;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpStream, UdpSocket};
use tokio::sync::OnceCell;
use tokio::time::timeout;
use trust_dns_resolver::{
config::{ResolverConfig, ResolverOpts},
TokioAsyncResolver,
};
use crate::error::McError;
use crate::models::*;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_MAX_PARALLEL: usize = 10;
const DNS_CACHE_TTL: u64 = 300;
const JAVA_DEFAULT_PORT: u16 = 25565;
const BEDROCK_DEFAULT_PORT: u16 = 19132;
const PROTOCOL_VERSION: i32 = 47;
const HANDSHAKE_PACKET_ID: i32 = 0x00;
const STATUS_REQUEST_PACKET_ID: i32 = 0x00;
const STATUS_RESPONSE_PACKET_ID: i32 = 0x00;
const MAX_VARINT_SHIFT: u32 = 35;
const INITIAL_PACKET_CAPACITY: usize = 64;
const INITIAL_RESPONSE_CAPACITY: usize = 1024;
const READ_BUFFER_SIZE: usize = 4096;
const BEDROCK_PING_PACKET_SIZE: usize = 35;
const BEDROCK_MIN_RESPONSE_SIZE: usize = 35;
static DNS_CACHE: Lazy<DashMap<String, (SocketAddr, SystemTime)>> = Lazy::new(DashMap::new);
static SRV_CACHE: Lazy<DashMap<String, ((String, u16), SystemTime)>> = Lazy::new(DashMap::new);
static RESOLVER: OnceCell<Arc<TokioAsyncResolver>> = OnceCell::const_new();
async fn get_resolver() -> Arc<TokioAsyncResolver> {
RESOLVER
.get_or_init(|| async {
let config = ResolverConfig::default();
let mut opts = ResolverOpts::default();
opts.cache_size = 1000;
opts.positive_min_ttl = Some(Duration::from_secs(60));
opts.negative_min_ttl = Some(Duration::from_secs(10));
Arc::new(TokioAsyncResolver::tokio(config, opts))
})
.await
.clone()
}
#[derive(Clone)]
pub struct McClient {
timeout: Duration,
max_parallel: usize,
}
impl Default for McClient {
fn default() -> Self {
Self {
timeout: DEFAULT_TIMEOUT,
max_parallel: DEFAULT_MAX_PARALLEL,
}
}
}
impl McClient {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub fn with_max_parallel(mut self, max_parallel: usize) -> Self {
self.max_parallel = max_parallel;
self
}
#[must_use]
pub fn max_parallel(&self) -> usize {
self.max_parallel
}
#[must_use]
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn clear_dns_cache(&self) {
DNS_CACHE.clear();
}
pub fn clear_srv_cache(&self) {
SRV_CACHE.clear();
}
pub fn clear_all_caches(&self) {
self.clear_dns_cache();
self.clear_srv_cache();
}
#[must_use]
pub fn cache_stats(&self) -> CacheStats {
CacheStats {
dns_entries: DNS_CACHE.len(),
srv_entries: SRV_CACHE.len(),
}
}
pub async fn resolve_dns_timed(&self, host: &str, port: u16) -> Result<(SocketAddr, f64), McError> {
let start = std::time::Instant::now();
let addr = self.resolve_dns(host, port).await?;
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
Ok((addr, elapsed))
}
pub async fn ping(&self, address: &str, edition: ServerEdition) -> Result<ServerStatus, McError> {
match edition {
ServerEdition::Java => self.ping_java(address).await,
ServerEdition::Bedrock => self.ping_bedrock(address).await,
}
}
pub async fn ping_java(&self, address: &str) -> Result<ServerStatus, McError> {
let start = SystemTime::now();
let (host, port) = Self::parse_address(address, JAVA_DEFAULT_PORT)?;
let port_explicit = address.contains(':');
let (actual_host, actual_port) = if port_explicit {
(host.to_string(), port)
} else {
self.lookup_srv_record(host, port).await?
};
let resolved = self.resolve_dns(&actual_host, actual_port).await?;
let dns_info = self.get_dns_info(host).await.ok();
let mut stream = timeout(self.timeout, TcpStream::connect(resolved))
.await
.map_err(|_| McError::Timeout)?
.map_err(|e| McError::ConnectionError(e.to_string()))?;
stream.set_nodelay(true).map_err(McError::IoError)?;
self.send_handshake(&mut stream, host, actual_port).await?;
self.send_status_request(&mut stream).await?;
let response = self.read_response(&mut stream).await?;
let (json, latency) = self.parse_java_response(response, start)?;
Ok(ServerStatus {
online: true,
ip: resolved.ip().to_string(),
port: resolved.port(),
hostname: host.to_string(),
latency,
dns: dns_info,
data: ServerData::Java(self.parse_java_json(&json)?),
})
}
pub async fn ping_bedrock(&self, address: &str) -> Result<ServerStatus, McError> {
let start = SystemTime::now();
let (host, port) = Self::parse_address(address, BEDROCK_DEFAULT_PORT)?;
let resolved = self.resolve_dns(host, port).await?;
let dns_info = self.get_dns_info(host).await.ok();
let socket = UdpSocket::bind("0.0.0.0:0").await.map_err(McError::IoError)?;
let ping_packet = self.create_bedrock_ping_packet();
timeout(self.timeout, socket.send_to(&ping_packet, resolved))
.await
.map_err(|_| McError::Timeout)?
.map_err(McError::IoError)?;
let mut buf = [0u8; READ_BUFFER_SIZE];
let (len, _) = timeout(self.timeout, socket.recv_from(&mut buf))
.await
.map_err(|_| McError::Timeout)?
.map_err(McError::IoError)?;
if len < BEDROCK_MIN_RESPONSE_SIZE {
return Err(McError::InvalidResponse("Response too short".to_string()));
}
let latency = start
.elapsed()
.map_err(|_| McError::InvalidResponse("Time error".to_string()))?
.as_secs_f64()
* 1000.0;
let pong_data = String::from_utf8_lossy(&buf[BEDROCK_PING_PACKET_SIZE..len]).to_string();
Ok(ServerStatus {
online: true,
ip: resolved.ip().to_string(),
port: resolved.port(),
hostname: host.to_string(),
latency,
dns: dns_info,
data: ServerData::Bedrock(self.parse_bedrock_response(&pong_data)?),
})
}
pub async fn is_online(&self, address: &str, edition: ServerEdition) -> bool {
self.ping(address, edition).await.is_ok()
}
pub async fn ping_many(&self, servers: &[ServerInfo]) -> Vec<(ServerInfo, Result<ServerStatus, McError>)> {
use futures::stream::StreamExt;
use tokio::sync::Semaphore;
let semaphore = Arc::new(Semaphore::new(self.max_parallel));
let client = self.clone();
let futures = servers.iter().map(|server| {
let server = server.clone();
let semaphore = semaphore.clone();
let client = client.clone();
async move {
let _permit = semaphore.acquire().await;
let result = client.ping(&server.address, server.edition).await;
(server, result)
}
});
futures::stream::iter(futures)
.buffer_unordered(self.max_parallel)
.collect()
.await
}
fn parse_address(address: &str, default_port: u16) -> Result<(&str, u16), McError> {
if let Some((host, port_str)) = address.split_once(':') {
let port = port_str
.parse::<u16>()
.map_err(|e| McError::InvalidPort(e.to_string()))?;
Ok((host, port))
} else {
Ok((address, default_port))
}
}
async fn lookup_srv_record(&self, host: &str, port: u16) -> Result<(String, u16), McError> {
let cache_key = format!("srv:{}", host);
if let Some(entry) = SRV_CACHE.get(&cache_key) {
let ((cached_host, cached_port), timestamp) = entry.value();
if timestamp
.elapsed()
.map(|d| d.as_secs() < DNS_CACHE_TTL)
.unwrap_or(false)
{
return Ok((cached_host.clone(), *cached_port));
}
}
let srv_name = format!("_minecraft._tcp.{}", host);
let resolver = get_resolver().await;
match timeout(self.timeout, resolver.srv_lookup(&srv_name)).await {
Ok(Ok(srv_lookup)) => {
if let Some(srv) = srv_lookup.iter().next() {
let target = srv.target().to_string().trim_end_matches('.').to_string();
let srv_port = srv.port();
let result = (target, srv_port);
SRV_CACHE.insert(cache_key, (result.clone(), SystemTime::now()));
return Ok(result);
}
}
Ok(Err(_)) => {
}
Err(_) => {
}
}
let result = (host.to_string(), port);
SRV_CACHE.insert(cache_key, (result.clone(), SystemTime::now()));
Ok(result)
}
async fn resolve_dns(&self, host: &str, port: u16) -> Result<SocketAddr, McError> {
let cache_key = format!("{}:{}", host, port);
if let Some(entry) = DNS_CACHE.get(&cache_key) {
let (addr, timestamp) = *entry.value();
if timestamp
.elapsed()
.map(|d| d.as_secs() < DNS_CACHE_TTL)
.unwrap_or(false)
{
return Ok(addr);
}
}
let addrs: Vec<SocketAddr> = format!("{}:{}", host, port)
.to_socket_addrs()
.map_err(|e| McError::DnsError(e.to_string()))?
.collect();
let addr = addrs
.iter()
.find(|a| a.is_ipv4())
.or_else(|| addrs.first())
.copied()
.ok_or_else(|| McError::DnsError("No addresses resolved".to_string()))?;
DNS_CACHE.insert(cache_key, (addr, SystemTime::now()));
Ok(addr)
}
async fn get_dns_info(&self, host: &str) -> Result<DnsInfo, McError> {
let addrs: Vec<SocketAddr> = format!("{}:0", host)
.to_socket_addrs()
.map_err(|e| McError::DnsError(e.to_string()))?
.collect();
Ok(DnsInfo {
a_records: addrs.iter().map(|a| a.ip().to_string()).collect(),
cname: None, ttl: DNS_CACHE_TTL as u32,
})
}
async fn send_handshake(&self, stream: &mut TcpStream, host: &str, port: u16) -> Result<(), McError> {
let mut handshake = Vec::with_capacity(INITIAL_PACKET_CAPACITY);
write_var_int(&mut handshake, HANDSHAKE_PACKET_ID);
write_var_int(&mut handshake, PROTOCOL_VERSION);
write_string(&mut handshake, host);
handshake.extend_from_slice(&port.to_be_bytes());
write_var_int(&mut handshake, 1);
let mut packet = Vec::with_capacity(handshake.len() + 5);
write_var_int(&mut packet, handshake.len() as i32);
packet.extend_from_slice(&handshake);
timeout(self.timeout, stream.write_all(&packet))
.await
.map_err(|_| McError::Timeout)?
.map_err(McError::IoError)
}
async fn send_status_request(&self, stream: &mut TcpStream) -> Result<(), McError> {
let mut status_request = Vec::with_capacity(5);
write_var_int(&mut status_request, STATUS_REQUEST_PACKET_ID);
let mut status_packet = Vec::with_capacity(status_request.len() + 5);
write_var_int(&mut status_packet, status_request.len() as i32);
status_packet.extend_from_slice(&status_request);
timeout(self.timeout, stream.write_all(&status_packet))
.await
.map_err(|_| McError::Timeout)?
.map_err(McError::IoError)
}
async fn read_response(&self, stream: &mut TcpStream) -> Result<Vec<u8>, McError> {
let mut response = Vec::with_capacity(INITIAL_RESPONSE_CAPACITY);
let mut buf = [0u8; READ_BUFFER_SIZE];
let mut expected_length = None;
loop {
let n = timeout(self.timeout, stream.read(&mut buf))
.await
.map_err(|_| McError::Timeout)?
.map_err(McError::IoError)?;
if n == 0 {
break;
}
response.extend_from_slice(&buf[..n]);
if expected_length.is_none() && response.len() >= 5 {
let mut cursor = Cursor::new(&response);
if let Ok(packet_length) = read_var_int(&mut cursor) {
expected_length = Some(cursor.position() as usize + packet_length as usize);
}
}
if let Some(expected) = expected_length {
if response.len() >= expected {
break;
}
}
}
if response.is_empty() {
return Err(McError::InvalidResponse("No response from server".to_string()));
}
Ok(response)
}
fn parse_java_response(&self, response: Vec<u8>, start: SystemTime) -> Result<(serde_json::Value, f64), McError> {
let mut cursor = Cursor::new(&response);
let packet_length = read_var_int(&mut cursor)
.map_err(|e| McError::InvalidResponse(format!("Failed to read packet length: {}", e)))?;
let total_expected = cursor.position() as usize + packet_length as usize;
if response.len() < total_expected {
return Err(McError::InvalidResponse(format!(
"Incomplete packet: expected {}, got {}",
total_expected, response.len()
)));
}
let packet_id = read_var_int(&mut cursor)
.map_err(|e| McError::InvalidResponse(format!("Failed to read packet ID: {}", e)))?;
if packet_id != STATUS_RESPONSE_PACKET_ID {
return Err(McError::InvalidResponse(format!("Unexpected packet ID: {}", packet_id)));
}
let json_length = read_var_int(&mut cursor)
.map_err(|e| McError::InvalidResponse(format!("Failed to read JSON length: {}", e)))?;
if cursor.position() as usize + json_length as usize > response.len() {
return Err(McError::InvalidResponse("JSON data truncated".to_string()));
}
let json_buf = &response[cursor.position() as usize..cursor.position() as usize + json_length as usize];
let json_str = String::from_utf8(json_buf.to_vec()).map_err(McError::Utf8Error)?;
let json: serde_json::Value = serde_json::from_str(&json_str).map_err(McError::JsonError)?;
let latency = start
.elapsed()
.map_err(|_| McError::InvalidResponse("Time error".to_string()))?
.as_secs_f64()
* 1000.0;
Ok((json, latency))
}
fn parse_java_json(&self, json: &serde_json::Value) -> Result<JavaStatus, McError> {
let version = JavaVersion {
name: json["version"]["name"]
.as_str()
.unwrap_or("Unknown")
.to_string(),
protocol: json["version"]["protocol"].as_i64().unwrap_or(0),
};
let players = JavaPlayers {
online: json["players"]["online"].as_i64().unwrap_or(0),
max: json["players"]["max"].as_i64().unwrap_or(0),
sample: if let Some(sample) = json["players"]["sample"].as_array() {
Some(
sample
.iter()
.filter_map(|p| {
Some(JavaPlayer {
name: p["name"].as_str()?.to_string(),
id: p["id"].as_str()?.to_string(),
})
})
.collect(),
)
} else {
None
},
};
let description = if let Some(desc) = json["description"].as_str() {
desc.to_string()
} else if let Some(text) = json["description"]["text"].as_str() {
text.to_string()
} else {
"No description".to_string()
};
let favicon = json["favicon"].as_str().map(|s| s.to_string());
let map = json["map"].as_str().map(|s| s.to_string());
let gamemode = json["gamemode"].as_str().map(|s| s.to_string());
let software = json["software"].as_str().map(|s| s.to_string());
let plugins = if let Some(plugins_array) = json["plugins"].as_array() {
Some(
plugins_array
.iter()
.filter_map(|p| {
Some(JavaPlugin {
name: p["name"].as_str()?.to_string(),
version: p["version"].as_str().map(|s| s.to_string()),
})
})
.collect(),
)
} else {
None
};
let mods = if let Some(mods_array) = json["mods"].as_array() {
Some(
mods_array
.iter()
.filter_map(|m| {
Some(JavaMod {
modid: m["modid"].as_str()?.to_string(),
version: m["version"].as_str().map(|s| s.to_string()),
})
})
.collect(),
)
} else {
None
};
Ok(JavaStatus {
version,
players,
description,
favicon,
map,
gamemode,
software,
plugins,
mods,
raw_data: json.clone(),
})
}
fn create_bedrock_ping_packet(&self) -> Vec<u8> {
let mut ping_packet = Vec::with_capacity(BEDROCK_PING_PACKET_SIZE);
ping_packet.push(0x01);
ping_packet.extend_from_slice(
&(SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64)
.to_be_bytes(),
);
ping_packet.extend_from_slice(&[
0x00, 0xFF, 0xFF, 0x00, 0xFE, 0xFE, 0xFE, 0xFE, 0xFD, 0xFD, 0xFD, 0xFD, 0x12, 0x34, 0x56, 0x78,
]);
ping_packet.extend_from_slice(&[0x00; 8]);
ping_packet
}
fn parse_bedrock_response(&self, pong_data: &str) -> Result<BedrockStatus, McError> {
let parts: Vec<&str> = pong_data.split(';').collect();
if parts.len() < 6 {
return Err(McError::InvalidResponse("Invalid Bedrock response".to_string()));
}
Ok(BedrockStatus {
edition: parts[0].to_string(),
motd: parts[1].to_string(),
protocol_version: parts[2].to_string(),
version: parts[3].to_string(),
online_players: parts[4].to_string(),
max_players: parts[5].to_string(),
server_uid: parts.get(6).map_or("", |s| *s).to_string(),
motd2: parts.get(7).map_or("", |s| *s).to_string(),
game_mode: parts.get(8).map_or("", |s| *s).to_string(),
game_mode_numeric: parts.get(9).map_or("", |s| *s).to_string(),
port_ipv4: parts.get(10).map_or("", |s| *s).to_string(),
port_ipv6: parts.get(11).map_or("", |s| *s).to_string(),
map: parts.get(12).map(|s| s.to_string()),
software: parts.get(13).map(|s| s.to_string()),
raw_data: pong_data.to_string(),
})
}
}
fn write_var_int(buffer: &mut Vec<u8>, value: i32) {
let mut value = value as u32;
loop {
let mut temp = (value & 0x7F) as u8;
value >>= 7;
if value != 0 {
temp |= 0x80;
}
buffer.push(temp);
if value == 0 {
break;
}
}
}
fn write_string(buffer: &mut Vec<u8>, s: &str) {
write_var_int(buffer, s.len() as i32);
buffer.extend_from_slice(s.as_bytes());
}
fn read_var_int(reader: &mut impl std::io::Read) -> Result<i32, String> {
let mut result = 0i32;
let mut shift = 0;
loop {
let mut byte = [0u8];
reader.read_exact(&mut byte).map_err(|e| e.to_string())?;
let value = byte[0] as i32;
result |= (value & 0x7F) << shift;
shift += 7;
if shift > MAX_VARINT_SHIFT {
return Err("VarInt too big".to_string());
}
if (value & 0x80) == 0 {
break;
}
}
Ok(result)
}