elytra_ping/
protocol.rs

1pub use self::frame::{Frame, FrameError, ServerState};
2use crate::mc_string::encode_mc_string;
3use crate::mc_string::McStringError;
4#[cfg(feature = "java_parse")]
5use crate::parse::JavaServerInfo;
6use bytes::{Buf, BytesMut};
7use mc_varint::{VarInt, VarIntWrite};
8use snafu::OptionExt;
9use snafu::{Backtrace, GenerateImplicitData, Snafu};
10use std::str::FromStr;
11use std::{
12    fmt::Debug,
13    io::{Cursor, Write},
14    time::Duration,
15};
16use tokio::{
17    io::{AsyncReadExt, AsyncWriteExt, BufWriter},
18    net::TcpStream,
19};
20use tracing::error;
21use tracing::info;
22use tracing::{debug, event, instrument, trace, Level};
23
24mod frame;
25
26#[derive(Snafu, Debug)]
27pub enum ProtocolError {
28    /// I/O error.
29    #[snafu(display("I/O error: {source}"), context(false))]
30    Io {
31        source: std::io::Error,
32        backtrace: Backtrace,
33    },
34    /// Failed to encode string as bytes.
35    #[snafu(display("Failed to encode string as bytes: {source}"), context(false))]
36    StringEncodeFailed {
37        #[snafu(backtrace)]
38        source: McStringError,
39    },
40    /// Failed to send a packet because it was longer than the maximum i32.
41    PacketTooLong { backtrace: Backtrace },
42    /// Connection closed unexpectedly.
43    ConnectionClosed { backtrace: Backtrace },
44    /// Failed to parse a packet.
45    #[snafu(display("Failed to parse a packet: {source}"), context(false))]
46    ParseFailed {
47        #[snafu(backtrace)]
48        source: FrameError,
49    },
50    /// Failed to resolve SRV record.
51    #[snafu(display("Failed to resolve SRV record: {source}"), context(false))]
52    SrvResolveError {
53        source: trust_dns_resolver::error::ResolveError,
54        backtrace: Backtrace,
55    },
56    /// Packet received out of order.
57    FrameOutOfOrder {
58        backtrace: Backtrace,
59        expected: &'static str,
60        got: Frame,
61    },
62    /// Failed to parse JSON response.
63    #[snafu(display("Failed to parse JSON response: {source}"), context(false))]
64    JsonParse {
65        source: serde_json::Error,
66        backtrace: Backtrace,
67    },
68    /// DNS lookup failed.
69    #[snafu(display("DNS lookup failed for address `{address}`."))]
70    DNSLookupFailed {
71        address: String,
72        backtrace: Backtrace,
73    },
74}
75
76#[derive(Debug)]
77pub struct SlpProtocol {
78    hostname: String,
79    port: u16,
80    stream: BufWriter<TcpStream>,
81    buffer: BytesMut,
82}
83
84#[repr(i32)]
85pub enum ProtocolState {
86    Status = 1,
87    Login = 2,
88}
89impl SlpProtocol {
90    pub fn new(hostname: String, port: u16, stream: TcpStream) -> Self {
91        Self {
92            hostname,
93            port,
94            stream: BufWriter::new(stream),
95            buffer: BytesMut::with_capacity(4096),
96        }
97    }
98
99    pub fn create_handshake_frame(&self) -> Frame {
100        Frame::Handshake {
101            protocol: VarInt::from(Frame::PROTOCOL_VERSION),
102            address: self.hostname.to_owned(),
103            port: self.port,
104            state: VarInt::from(ProtocolState::Status as i32),
105        }
106    }
107
108    /// Sends frame data over the connection as a packet.
109    pub async fn write_frame(&mut self, frame: Frame) -> Result<(), ProtocolError> {
110        debug!("Writing frame: {frame:?}");
111
112        let mut packet_data: Vec<u8> = Vec::with_capacity(5);
113
114        match frame {
115            Frame::Handshake {
116                protocol,
117                address,
118                port,
119                state,
120            } => {
121                trace!("writing handshake frame");
122                packet_data.write_var_int(VarInt::from(Frame::HANDSHAKE_ID))?;
123                packet_data.write_var_int(protocol)?;
124                Write::write(&mut packet_data, &encode_mc_string(&address)?)?;
125                Write::write(&mut packet_data, &port.to_be_bytes())?;
126                packet_data.write_var_int(state)?;
127            }
128            Frame::StatusRequest => {
129                trace!("writing status request frame");
130                packet_data.write_var_int(VarInt::from(Frame::STATUS_REQUEST_ID))?;
131            }
132            Frame::StatusResponse { json } => {
133                trace!("writing status response frame");
134                packet_data.write_var_int(VarInt::from(Frame::STATUS_RESPONSE_ID))?;
135                Write::write(&mut packet_data, &encode_mc_string(&json)?)?;
136            }
137            Frame::PingRequest { payload } => {
138                trace!("writing ping request frame");
139                packet_data.write_var_int(VarInt::from(Frame::PING_REQUEST_ID))?;
140                Write::write(&mut packet_data, &payload.to_be_bytes())?;
141            }
142            Frame::PingResponse { payload } => {
143                trace!("writing ping response frame");
144                packet_data.write_var_int(VarInt::from(Frame::PING_RESPONSE_ID))?;
145                Write::write(&mut packet_data, &payload.to_be_bytes())?;
146            }
147        }
148
149        let len = VarInt::from(i32::try_from(packet_data.len()).unwrap());
150        event!(
151            Level::TRACE,
152            "combining packet length (of {}) and data",
153            packet_data.len()
154        );
155        let mut packet: Vec<u8> = Vec::with_capacity(packet_data.len() + 5);
156        packet.write_var_int(len)?;
157        Write::write(&mut packet, &packet_data)?;
158
159        trace!("sending the packet!");
160        self.stream.write_all(&packet).await?;
161        self.stream.flush().await?;
162        Ok(())
163    }
164
165    /// Receive and parse a frame from the connection.
166    ///
167    /// # Arguments
168    ///
169    /// * `server_state` - Switches between which type of frame to accept. Set to None to accept frames for the client.
170    pub async fn read_frame(
171        &mut self,
172        server_state: Option<ServerState>,
173    ) -> Result<Option<Frame>, ProtocolError> {
174        loop {
175            // Attempt to parse a frame from the buffered data. If enough data
176            // has been buffered, the frame is returned.
177            if let Some(frame) = self.parse_frame(server_state)? {
178                debug!("Received frame: {frame:?}");
179                return Ok(Some(frame));
180            }
181
182            // There is not enough buffered data to read a frame. Attempt to
183            // read more data from the socket.
184            //
185            // On success, the number of bytes is returned. `0` indicates "end
186            // of stream".
187            let bytes_read = self.stream.read_buf(&mut self.buffer).await?;
188            if bytes_read == 0 {
189                // The remote closed the connection. For this to be a clean
190                // shutdown, there should be no data in the read buffer. If
191                // there is, this means that the peer closed the socket while
192                // sending a frame.
193                if self.buffer.is_empty() {
194                    info!("Connection closed cleanly");
195                    return Ok(None);
196                }
197                error!("Connection closed unexpectedly");
198                return Err(ProtocolError::ConnectionClosed {
199                    backtrace: Backtrace::generate(),
200                });
201            }
202        }
203    }
204
205    /// Parse the most recent frame from the connection, removing it from the buffer.
206    ///
207    /// # Arguments
208    ///
209    /// * `server_state` - Switches between which type of frame to accept. Set to None to accept frames for the client.
210    pub fn parse_frame(
211        &mut self,
212        server_state: Option<ServerState>,
213    ) -> Result<Option<Frame>, ProtocolError> {
214        let mut cursor = Cursor::new(&self.buffer[..]);
215
216        // Check whether a full frame is available
217        match Frame::check(&mut cursor) {
218            Ok(()) => {
219                let frame = Frame::parse(&mut cursor, server_state)?;
220
221                trace!("Discarding frame from buffer");
222                // current cursor position is the entire frame
223                self.buffer.advance(cursor.position() as usize);
224
225                // Return the frame to the caller.
226                Ok(Some(frame))
227            }
228            // Not enough data has been buffered
229            Err(FrameError::Incomplete { .. }) => Ok(None),
230            // An error was encountered
231            Err(e) => Err(e.into()),
232        }
233    }
234
235    pub async fn disconnect(mut self) -> Result<(), ProtocolError> {
236        self.stream.shutdown().await?;
237        Ok(())
238    }
239
240    #[cfg(feature = "simple")]
241    pub async fn handshake(&mut self) -> Result<(), ProtocolError> {
242        self.write_frame(self.create_handshake_frame()).await?;
243        Ok(())
244    }
245
246    #[cfg(feature = "simple")]
247    pub async fn get_status(&mut self) -> Result<JavaServerInfo, ProtocolError> {
248        self.write_frame(Frame::StatusRequest).await?;
249        let frame = self
250            .read_frame(None)
251            .await?
252            .context(ConnectionClosedSnafu)?;
253        let frame_data = match frame {
254            Frame::StatusResponse { json } => json,
255            frame => {
256                return FrameOutOfOrderSnafu {
257                    expected: "StatusResponse",
258                    got: frame,
259                }
260                .fail()
261            }
262        };
263        Ok(JavaServerInfo::from_str(&frame_data)?)
264    }
265
266    #[cfg(feature = "simple")]
267    pub async fn get_latency(&mut self) -> Result<Duration, ProtocolError> {
268        use std::time::Instant;
269        const PING_PAYLOAD: i64 = 54321;
270
271        let ping_time = Instant::now();
272
273        self.write_frame(Frame::PingRequest {
274            payload: PING_PAYLOAD,
275        })
276        .await?;
277        let frame = self
278            .read_frame(None)
279            .await?
280            .context(ConnectionClosedSnafu)?;
281        match frame {
282            Frame::PingResponse { .. } | Frame::StatusResponse { .. } => Ok(ping_time.elapsed()),
283            frame => FrameOutOfOrderSnafu {
284                expected: "PingResponse",
285                got: frame,
286            }
287            .fail(),
288        }
289    }
290}
291
292#[cfg(feature = "java_connect")]
293#[instrument]
294pub async fn connect(mut addrs: (String, u16)) -> Result<SlpProtocol, ProtocolError> {
295    use tokio::net::lookup_host;
296    use tracing::{debug, info};
297    use trust_dns_resolver::TokioAsyncResolver;
298
299    let resolver = TokioAsyncResolver::tokio_from_system_conf()?;
300    if let Ok(records) = resolver
301        .srv_lookup(format!("_minecraft._tcp.{}", addrs.0))
302        .await
303    {
304        if let Some(record) = records.iter().next() {
305            let record = record.target().to_utf8();
306            debug!("Found SRV record: {} -> {}", addrs.0, record);
307            addrs.0 = record;
308        }
309    }
310
311    // lookup_host can return multiple but we just need one so we discard the rest
312    let socket_addrs = match lookup_host(addrs.clone()).await?.next() {
313        Some(socket_addrs) => socket_addrs,
314        None => {
315            info!("DNS lookup failed for address");
316            return DNSLookupFailedSnafu { address: addrs.0 }.fail();
317        }
318    };
319
320    match TcpStream::connect(socket_addrs).await {
321        Ok(stream) => {
322            info!("Connected to SLP server");
323            Ok(SlpProtocol::new(addrs.0, addrs.1, stream))
324        }
325        Err(error) => {
326            info!("Failed to connect to SLP server: {}", error);
327            Err(error.into())
328        }
329    }
330}