1pub use self::frame::{Frame, FrameError, ServerState};
2use crate::mc_string::encode_mc_string;
3use crate::mc_string::McStringError;
4#[cfg(feature = "java_parse")]
5use crate::parse::JavaServerInfo;
6use bytes::{Buf, BytesMut};
7use mc_varint::{VarInt, VarIntWrite};
8use snafu::OptionExt;
9use snafu::{Backtrace, GenerateImplicitData, Snafu};
10use std::str::FromStr;
11use std::{
12 fmt::Debug,
13 io::{Cursor, Write},
14 time::Duration,
15};
16use tokio::{
17 io::{AsyncReadExt, AsyncWriteExt, BufWriter},
18 net::TcpStream,
19};
20use tracing::error;
21use tracing::info;
22use tracing::{debug, event, instrument, trace, Level};
23
24mod frame;
25
26#[derive(Snafu, Debug)]
27pub enum ProtocolError {
28 #[snafu(display("I/O error: {source}"), context(false))]
30 Io {
31 source: std::io::Error,
32 backtrace: Backtrace,
33 },
34 #[snafu(display("Failed to encode string as bytes: {source}"), context(false))]
36 StringEncodeFailed {
37 #[snafu(backtrace)]
38 source: McStringError,
39 },
40 PacketTooLong { backtrace: Backtrace },
42 ConnectionClosed { backtrace: Backtrace },
44 #[snafu(display("Failed to parse a packet: {source}"), context(false))]
46 ParseFailed {
47 #[snafu(backtrace)]
48 source: FrameError,
49 },
50 #[snafu(display("Failed to resolve SRV record: {source}"), context(false))]
52 SrvResolveError {
53 source: trust_dns_resolver::error::ResolveError,
54 backtrace: Backtrace,
55 },
56 FrameOutOfOrder {
58 backtrace: Backtrace,
59 expected: &'static str,
60 got: Frame,
61 },
62 #[snafu(display("Failed to parse JSON response: {source}"), context(false))]
64 JsonParse {
65 source: serde_json::Error,
66 backtrace: Backtrace,
67 },
68 #[snafu(display("DNS lookup failed for address `{address}`."))]
70 DNSLookupFailed {
71 address: String,
72 backtrace: Backtrace,
73 },
74}
75
76#[derive(Debug)]
77pub struct SlpProtocol {
78 hostname: String,
79 port: u16,
80 stream: BufWriter<TcpStream>,
81 buffer: BytesMut,
82}
83
84#[repr(i32)]
85pub enum ProtocolState {
86 Status = 1,
87 Login = 2,
88}
89impl SlpProtocol {
90 pub fn new(hostname: String, port: u16, stream: TcpStream) -> Self {
91 Self {
92 hostname,
93 port,
94 stream: BufWriter::new(stream),
95 buffer: BytesMut::with_capacity(4096),
96 }
97 }
98
99 pub fn create_handshake_frame(&self) -> Frame {
100 Frame::Handshake {
101 protocol: VarInt::from(Frame::PROTOCOL_VERSION),
102 address: self.hostname.to_owned(),
103 port: self.port,
104 state: VarInt::from(ProtocolState::Status as i32),
105 }
106 }
107
108 pub async fn write_frame(&mut self, frame: Frame) -> Result<(), ProtocolError> {
110 debug!("Writing frame: {frame:?}");
111
112 let mut packet_data: Vec<u8> = Vec::with_capacity(5);
113
114 match frame {
115 Frame::Handshake {
116 protocol,
117 address,
118 port,
119 state,
120 } => {
121 trace!("writing handshake frame");
122 packet_data.write_var_int(VarInt::from(Frame::HANDSHAKE_ID))?;
123 packet_data.write_var_int(protocol)?;
124 Write::write(&mut packet_data, &encode_mc_string(&address)?)?;
125 Write::write(&mut packet_data, &port.to_be_bytes())?;
126 packet_data.write_var_int(state)?;
127 }
128 Frame::StatusRequest => {
129 trace!("writing status request frame");
130 packet_data.write_var_int(VarInt::from(Frame::STATUS_REQUEST_ID))?;
131 }
132 Frame::StatusResponse { json } => {
133 trace!("writing status response frame");
134 packet_data.write_var_int(VarInt::from(Frame::STATUS_RESPONSE_ID))?;
135 Write::write(&mut packet_data, &encode_mc_string(&json)?)?;
136 }
137 Frame::PingRequest { payload } => {
138 trace!("writing ping request frame");
139 packet_data.write_var_int(VarInt::from(Frame::PING_REQUEST_ID))?;
140 Write::write(&mut packet_data, &payload.to_be_bytes())?;
141 }
142 Frame::PingResponse { payload } => {
143 trace!("writing ping response frame");
144 packet_data.write_var_int(VarInt::from(Frame::PING_RESPONSE_ID))?;
145 Write::write(&mut packet_data, &payload.to_be_bytes())?;
146 }
147 }
148
149 let len = VarInt::from(i32::try_from(packet_data.len()).unwrap());
150 event!(
151 Level::TRACE,
152 "combining packet length (of {}) and data",
153 packet_data.len()
154 );
155 let mut packet: Vec<u8> = Vec::with_capacity(packet_data.len() + 5);
156 packet.write_var_int(len)?;
157 Write::write(&mut packet, &packet_data)?;
158
159 trace!("sending the packet!");
160 self.stream.write_all(&packet).await?;
161 self.stream.flush().await?;
162 Ok(())
163 }
164
165 pub async fn read_frame(
171 &mut self,
172 server_state: Option<ServerState>,
173 ) -> Result<Option<Frame>, ProtocolError> {
174 loop {
175 if let Some(frame) = self.parse_frame(server_state)? {
178 debug!("Received frame: {frame:?}");
179 return Ok(Some(frame));
180 }
181
182 let bytes_read = self.stream.read_buf(&mut self.buffer).await?;
188 if bytes_read == 0 {
189 if self.buffer.is_empty() {
194 info!("Connection closed cleanly");
195 return Ok(None);
196 }
197 error!("Connection closed unexpectedly");
198 return Err(ProtocolError::ConnectionClosed {
199 backtrace: Backtrace::generate(),
200 });
201 }
202 }
203 }
204
205 pub fn parse_frame(
211 &mut self,
212 server_state: Option<ServerState>,
213 ) -> Result<Option<Frame>, ProtocolError> {
214 let mut cursor = Cursor::new(&self.buffer[..]);
215
216 match Frame::check(&mut cursor) {
218 Ok(()) => {
219 let frame = Frame::parse(&mut cursor, server_state)?;
220
221 trace!("Discarding frame from buffer");
222 self.buffer.advance(cursor.position() as usize);
224
225 Ok(Some(frame))
227 }
228 Err(FrameError::Incomplete { .. }) => Ok(None),
230 Err(e) => Err(e.into()),
232 }
233 }
234
235 pub async fn disconnect(mut self) -> Result<(), ProtocolError> {
236 self.stream.shutdown().await?;
237 Ok(())
238 }
239
240 #[cfg(feature = "simple")]
241 pub async fn handshake(&mut self) -> Result<(), ProtocolError> {
242 self.write_frame(self.create_handshake_frame()).await?;
243 Ok(())
244 }
245
246 #[cfg(feature = "simple")]
247 pub async fn get_status(&mut self) -> Result<JavaServerInfo, ProtocolError> {
248 self.write_frame(Frame::StatusRequest).await?;
249 let frame = self
250 .read_frame(None)
251 .await?
252 .context(ConnectionClosedSnafu)?;
253 let frame_data = match frame {
254 Frame::StatusResponse { json } => json,
255 frame => {
256 return FrameOutOfOrderSnafu {
257 expected: "StatusResponse",
258 got: frame,
259 }
260 .fail()
261 }
262 };
263 Ok(JavaServerInfo::from_str(&frame_data)?)
264 }
265
266 #[cfg(feature = "simple")]
267 pub async fn get_latency(&mut self) -> Result<Duration, ProtocolError> {
268 use std::time::Instant;
269 const PING_PAYLOAD: i64 = 54321;
270
271 let ping_time = Instant::now();
272
273 self.write_frame(Frame::PingRequest {
274 payload: PING_PAYLOAD,
275 })
276 .await?;
277 let frame = self
278 .read_frame(None)
279 .await?
280 .context(ConnectionClosedSnafu)?;
281 match frame {
282 Frame::PingResponse { .. } | Frame::StatusResponse { .. } => Ok(ping_time.elapsed()),
283 frame => FrameOutOfOrderSnafu {
284 expected: "PingResponse",
285 got: frame,
286 }
287 .fail(),
288 }
289 }
290}
291
292#[cfg(feature = "java_connect")]
293#[instrument]
294pub async fn connect(mut addrs: (String, u16)) -> Result<SlpProtocol, ProtocolError> {
295 use tokio::net::lookup_host;
296 use tracing::{debug, info};
297 use trust_dns_resolver::TokioAsyncResolver;
298
299 let resolver = TokioAsyncResolver::tokio_from_system_conf()?;
300 if let Ok(records) = resolver
301 .srv_lookup(format!("_minecraft._tcp.{}", addrs.0))
302 .await
303 {
304 if let Some(record) = records.iter().next() {
305 let record = record.target().to_utf8();
306 debug!("Found SRV record: {} -> {}", addrs.0, record);
307 addrs.0 = record;
308 }
309 }
310
311 let socket_addrs = match lookup_host(addrs.clone()).await?.next() {
313 Some(socket_addrs) => socket_addrs,
314 None => {
315 info!("DNS lookup failed for address");
316 return DNSLookupFailedSnafu { address: addrs.0 }.fail();
317 }
318 };
319
320 match TcpStream::connect(socket_addrs).await {
321 Ok(stream) => {
322 info!("Connected to SLP server");
323 Ok(SlpProtocol::new(addrs.0, addrs.1, stream))
324 }
325 Err(error) => {
326 info!("Failed to connect to SLP server: {}", error);
327 Err(error.into())
328 }
329 }
330}