Skip to main content

elytra_ping/
protocol.rs

1pub use self::frame::{Frame, FrameError};
2use crate::mc_string::encode_mc_string;
3use crate::mc_string::McStringError;
4#[cfg(feature = "java_parse")]
5use crate::parse::JavaServerInfo;
6use bytes::{Buf, BytesMut};
7use mc_varint::{VarInt, VarIntWrite};
8use snafu::OptionExt;
9use snafu::{Backtrace, GenerateImplicitData, Snafu};
10use std::str::FromStr;
11use std::time::Instant;
12use std::{
13    fmt::Debug,
14    io::{Cursor, Write},
15    time::Duration,
16};
17use tokio::{
18    io::{AsyncReadExt, AsyncWriteExt, BufWriter},
19    net::TcpStream,
20};
21use tracing::error;
22use tracing::info;
23use tracing::{debug, event, instrument, trace, Level};
24
25mod frame;
26
27#[derive(Snafu, Debug)]
28pub enum ProtocolError {
29    /// I/O error.
30    #[snafu(display("I/O error: {source}"), context(false))]
31    Io {
32        source: std::io::Error,
33        backtrace: Backtrace,
34    },
35    /// Failed to encode string as bytes.
36    #[snafu(display("Failed to encode string as bytes: {source}"), context(false))]
37    StringEncodeFailed {
38        #[snafu(backtrace)]
39        source: McStringError,
40    },
41    /// Failed to send a packet because it was longer than the maximum i32.
42    PacketTooLong { backtrace: Backtrace },
43    /// Connection closed unexpectedly.
44    ConnectionClosed { backtrace: Backtrace },
45    /// Failed to parse a packet.
46    #[snafu(display("Failed to parse a packet: {source}"), context(false))]
47    ParseFailed {
48        #[snafu(backtrace)]
49        source: FrameError,
50    },
51    /// Failed to resolve SRV record.
52    #[snafu(display("Failed to resolve SRV record: {source}"), context(false))]
53    SrvResolveError {
54        source: trust_dns_resolver::error::ResolveError,
55        backtrace: Backtrace,
56    },
57    /// Packet received out of order.
58    FrameOutOfOrder {
59        backtrace: Backtrace,
60        expected: &'static str,
61        got: Frame,
62    },
63    /// Failed to parse JSON response.
64    #[snafu(display("Failed to parse JSON response: {source}"))]
65    #[cfg(feature = "java_parse")]
66    JsonParse {
67        source: serde_json::Error,
68        backtrace: Backtrace,
69        json: String,
70    },
71    /// DNS lookup failed.
72    #[snafu(display("DNS lookup failed for address `{address}`."))]
73    DNSLookupFailed {
74        address: String,
75        backtrace: Backtrace,
76    },
77}
78
79#[derive(Debug)]
80pub struct SlpProtocol {
81    hostname: String,
82    port: u16,
83    stream: BufWriter<TcpStream>,
84    buffer: BytesMut,
85}
86
87#[repr(i32)]
88pub enum ProtocolState {
89    Status = 1,
90    Login = 2,
91}
92impl SlpProtocol {
93    pub fn new(hostname: String, port: u16, stream: TcpStream) -> Self {
94        Self {
95            hostname,
96            port,
97            stream: BufWriter::new(stream),
98            buffer: BytesMut::with_capacity(4096),
99        }
100    }
101
102    pub fn create_handshake_frame(&self) -> Frame {
103        Frame::Handshake {
104            protocol: VarInt::from(Frame::PROTOCOL_VERSION),
105            address: self.hostname.to_owned(),
106            port: self.port,
107            state: VarInt::from(ProtocolState::Status as i32),
108        }
109    }
110
111    /// Sends frame data over the connection as a packet.
112    pub async fn write_frame(&mut self, frame: Frame) -> Result<(), ProtocolError> {
113        debug!("Writing frame: {frame:?}");
114
115        let mut packet_data: Vec<u8> = Vec::with_capacity(5);
116
117        match frame {
118            Frame::Handshake {
119                protocol,
120                address,
121                port,
122                state,
123            } => {
124                trace!("writing handshake frame");
125                packet_data.write_var_int(VarInt::from(Frame::HANDSHAKE_ID))?;
126                packet_data.write_var_int(protocol)?;
127                Write::write(&mut packet_data, &encode_mc_string(&address)?)?;
128                Write::write(&mut packet_data, &port.to_be_bytes())?;
129                packet_data.write_var_int(state)?;
130            }
131            Frame::StatusRequest => {
132                trace!("writing status request frame");
133                packet_data.write_var_int(VarInt::from(Frame::STATUS_REQUEST_ID))?;
134            }
135            Frame::StatusResponse { json } => {
136                trace!("writing status response frame");
137                packet_data.write_var_int(VarInt::from(Frame::STATUS_RESPONSE_ID))?;
138                Write::write(&mut packet_data, &encode_mc_string(&json)?)?;
139            }
140            Frame::PingRequest { payload } => {
141                trace!("writing ping request frame");
142                packet_data.write_var_int(VarInt::from(Frame::PING_REQUEST_ID))?;
143                Write::write(&mut packet_data, &payload.to_be_bytes())?;
144            }
145            Frame::PingResponse { payload } => {
146                trace!("writing ping response frame");
147                packet_data.write_var_int(VarInt::from(Frame::PING_RESPONSE_ID))?;
148                Write::write(&mut packet_data, &payload.to_be_bytes())?;
149            }
150        }
151
152        let len = VarInt::from(i32::try_from(packet_data.len()).unwrap());
153        event!(
154            Level::TRACE,
155            "combining packet length (of {}) and data",
156            packet_data.len()
157        );
158        let mut packet: Vec<u8> = Vec::with_capacity(packet_data.len() + 5);
159        packet.write_var_int(len)?;
160        Write::write(&mut packet, &packet_data)?;
161
162        trace!("sending the packet!");
163        self.stream.write_all(&packet).await?;
164        self.stream.flush().await?;
165        Ok(())
166    }
167
168    /// Receive and parse a frame from the connection.
169    ///
170    /// # Arguments
171    ///
172    /// * `server_state` - Switches between which type of frame to accept. Set to None to accept frames for the client.
173    pub async fn read_frame(&mut self) -> Result<Option<Frame>, ProtocolError> {
174        loop {
175            // Attempt to parse a frame from the buffered data. If enough data
176            // has been buffered, the frame is returned.
177            if let Some(frame) = self.parse_frame()? {
178                debug!("Received frame: {frame:?}");
179                return Ok(Some(frame));
180            }
181
182            // There is not enough buffered data to read a frame. Attempt to
183            // read more data from the socket.
184            //
185            // On success, the number of bytes is returned. `0` indicates "end
186            // of stream".
187            let bytes_read = self.stream.read_buf(&mut self.buffer).await?;
188            if bytes_read == 0 {
189                // The remote closed the connection. For this to be a clean
190                // shutdown, there should be no data in the read buffer. If
191                // there is, this means that the peer closed the socket while
192                // sending a frame.
193                if self.buffer.is_empty() {
194                    info!("Connection closed cleanly");
195                    return Ok(None);
196                }
197                error!("Connection closed unexpectedly");
198                return Err(ProtocolError::ConnectionClosed {
199                    backtrace: Backtrace::generate(),
200                });
201            }
202        }
203    }
204
205    /// Parse the most recent frame from the connection, removing it from the buffer.
206    ///
207    /// # Arguments
208    ///
209    /// * `server_state` - Switches between which type of frame to accept. Set to None to accept frames for the client.
210    pub fn parse_frame(&mut self) -> Result<Option<Frame>, ProtocolError> {
211        let mut cursor = Cursor::new(&self.buffer[..]);
212
213        // Check whether a full frame is available
214        match Frame::check(&mut cursor) {
215            Ok(()) => {
216                let frame = Frame::parse(&mut cursor)?;
217
218                trace!("Discarding frame from buffer");
219                // current cursor position is the entire frame
220                self.buffer.advance(cursor.position() as usize);
221
222                // Return the frame to the caller.
223                Ok(Some(frame))
224            }
225            // Not enough data has been buffered
226            Err(FrameError::Incomplete { .. }) => Ok(None),
227            // An error was encountered
228            Err(e) => Err(e.into()),
229        }
230    }
231
232    pub async fn disconnect(mut self) -> Result<(), ProtocolError> {
233        self.stream.shutdown().await?;
234        Ok(())
235    }
236
237    pub async fn handshake(&mut self) -> Result<(), ProtocolError> {
238        self.write_frame(self.create_handshake_frame()).await?;
239        Ok(())
240    }
241
242    #[cfg(feature = "java_parse")]
243    pub async fn get_status(&mut self) -> Result<JavaServerInfo, ProtocolError> {
244        use snafu::ResultExt;
245
246        self.write_frame(Frame::StatusRequest).await?;
247        let frame = self.read_frame().await?.context(ConnectionClosedSnafu)?;
248        let json = match frame {
249            Frame::StatusResponse { json } => json,
250            frame => {
251                return FrameOutOfOrderSnafu {
252                    expected: "StatusResponse",
253                    got: frame,
254                }
255                .fail()
256            }
257        };
258
259        JavaServerInfo::from_str(&json).with_context(|_| JsonParseSnafu { json })
260    }
261
262    pub async fn get_latency(&mut self) -> Result<Duration, ProtocolError> {
263        const PING_PAYLOAD: i64 = 54321;
264
265        let ping_time = Instant::now();
266
267        self.write_frame(Frame::PingRequest {
268            payload: PING_PAYLOAD,
269        })
270        .await?;
271        let frame = self.read_frame().await?.context(ConnectionClosedSnafu)?;
272        match frame {
273            Frame::PingResponse { .. } | Frame::StatusResponse { .. } => Ok(ping_time.elapsed()),
274            frame => FrameOutOfOrderSnafu {
275                expected: "PingResponse",
276                got: frame,
277            }
278            .fail(),
279        }
280    }
281}
282
283#[cfg(feature = "java_connect")]
284#[instrument]
285pub async fn connect(mut addrs: (String, u16)) -> Result<SlpProtocol, ProtocolError> {
286    use tokio::net::lookup_host;
287    use tracing::{debug, info};
288    use trust_dns_resolver::TokioAsyncResolver;
289
290    let resolver = TokioAsyncResolver::tokio_from_system_conf()?;
291    if let Ok(records) = resolver
292        .srv_lookup(format!("_minecraft._tcp.{}", addrs.0))
293        .await
294    {
295        if let Some(record) = records.iter().next() {
296            let record = record.target().to_utf8();
297            debug!("Found SRV record: {} -> {}", addrs.0, record);
298            addrs.0 = record;
299        }
300    }
301
302    // lookup_host can return multiple but we just need one so we discard the rest
303    let socket_addrs = match lookup_host(addrs.clone()).await?.next() {
304        Some(socket_addrs) => socket_addrs,
305        None => {
306            info!("DNS lookup failed for address");
307            return DNSLookupFailedSnafu { address: addrs.0 }.fail();
308        }
309    };
310
311    match TcpStream::connect(socket_addrs).await {
312        Ok(stream) => {
313            info!("Connected to SLP server");
314            Ok(SlpProtocol::new(addrs.0, addrs.1, stream))
315        }
316        Err(error) => {
317            info!("Failed to connect to SLP server: {}", error);
318            Err(error.into())
319        }
320    }
321}