veloren_query_server/
server.rs

1#[allow(deprecated)] use std::hash::SipHasher;
2use std::{
3    hash::{Hash, Hasher},
4    io::{self, ErrorKind},
5    net::SocketAddr,
6    sync::{Arc, Mutex},
7    time::{Duration, Instant},
8};
9
10use protocol::Parcel;
11use rand::{thread_rng, Rng};
12use tokio::{net::UdpSocket, sync::watch};
13use tracing::{debug, error, trace};
14
15use crate::{
16    proto::{
17        Init, QueryServerRequest, QueryServerResponse, RawQueryServerRequest,
18        RawQueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER,
19        VERSION,
20    },
21    ratelimit::{RateLimiter, ReducedIpAddr},
22};
23
24const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(300);
25
26pub struct QueryServer {
27    addr: SocketAddr,
28    server_info: watch::Receiver<ServerInfo>,
29    settings: protocol::Settings,
30    ratelimit: RateLimiter,
31}
32
33#[derive(Default, Clone, Copy, Debug)]
34pub struct Metrics {
35    pub received_packets: u32,
36    pub dropped_packets: u32,
37    pub invalid_packets: u32,
38    pub proccessing_errors: u32,
39    pub info_requests: u32,
40    pub init_requests: u32,
41    pub sent_responses: u32,
42    pub failed_responses: u32,
43    pub timed_out_responses: u32,
44    pub ratelimited: u32,
45}
46
47impl QueryServer {
48    pub fn new(addr: SocketAddr, server_info: watch::Receiver<ServerInfo>, ratelimit: u16) -> Self {
49        Self {
50            addr,
51            server_info,
52            ratelimit: RateLimiter::new(ratelimit),
53            settings: Default::default(),
54        }
55    }
56
57    /// This produces TRACE level logs for any packet received on the assigned
58    /// port. To prevent potentially unfettered log spam, disable the TRACE
59    /// level for this crate (when outside of debugging contexts).
60    ///
61    /// NOTE: TRACE and DEBUG levels are disabled by default for this crate when
62    /// using `veloren-common-frontend`.
63    pub async fn run(&mut self, metrics: Arc<Mutex<Metrics>>) -> Result<(), tokio::io::Error> {
64        let mut socket = UdpSocket::bind(self.addr).await?;
65
66        let gen_secret = || {
67            let mut rng = thread_rng();
68            (rng.gen::<u64>(), rng.gen::<u64>())
69        };
70        let mut secrets = gen_secret();
71        let mut last_secret_refresh = Instant::now();
72
73        let mut buf = Box::new([0; MAX_REQUEST_SIZE]);
74        loop {
75            let (len, remote_addr) = match socket.recv_from(&mut *buf).await {
76                Ok(v) => v,
77                Err(e) if e.kind() == ErrorKind::NotConnected => {
78                    error!(
79                        ?e,
80                        "Query server connection was closed, re-binding to socket..."
81                    );
82                    socket = UdpSocket::bind(self.addr).await?;
83                    continue;
84                },
85                err => {
86                    debug!(?err, "Error while receiving from query server socket");
87                    continue;
88                },
89            };
90
91            let mut new_metrics = Metrics {
92                received_packets: 1,
93                ..Default::default()
94            };
95
96            let raw_msg_buf = &buf[..len];
97            let msg_buf = if Self::validate_datagram(raw_msg_buf) {
98                // Require 2 extra bytes for version (currently unused)
99                &raw_msg_buf[2..(raw_msg_buf.len() - VELOREN_HEADER.len())]
100            } else {
101                new_metrics.dropped_packets += 1;
102                continue;
103            };
104
105            self.process_datagram(msg_buf, remote_addr, secrets, &mut new_metrics, &socket)
106                .await;
107
108            // Update metrics at the end of eath packet
109            if let Ok(mut metrics) = metrics.lock() {
110                *metrics += new_metrics;
111            }
112
113            {
114                let now = Instant::now();
115                if now.duration_since(last_secret_refresh) > SECRET_REGEN_INTERNVAL {
116                    last_secret_refresh = now;
117                    secrets = gen_secret();
118                }
119
120                self.ratelimit.maintain(now);
121            }
122        }
123    }
124
125    // Header must be discarded after this validation passes
126    fn validate_datagram(data: &[u8]) -> bool {
127        let len = data.len();
128        // Require 2 extra bytes for version (currently unused)
129        if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) {
130            trace!(?len, "Datagram too short");
131            false
132        } else if len > MAX_REQUEST_SIZE {
133            trace!(?len, "Datagram too large");
134            false
135        } else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER {
136            trace!(?len, "Datagram header invalid");
137            false
138        // TODO: Allow lower versions once proper versioning is added.
139        } else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION {
140            trace!(
141                "Datagram has invalid version {:?}, current {VERSION:?}",
142                &data[..2]
143            );
144            false
145        } else {
146            true
147        }
148    }
149
150    async fn process_datagram(
151        &mut self,
152        datagram: &[u8],
153        remote: SocketAddr,
154        secrets: (u64, u64),
155        metrics: &mut Metrics,
156        socket: &UdpSocket,
157    ) {
158        let Ok(RawQueryServerRequest {
159            p: client_p,
160            request,
161        }) =
162            <RawQueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
163        else {
164            metrics.invalid_packets += 1;
165            return;
166        };
167
168        trace!(?request, "Received packet");
169
170        #[allow(deprecated)]
171        let real_p = {
172            // Use SipHash-2-4 to compute the `p` value from a server specific
173            // secret and the client's address.
174            //
175            // This is used to verify that packets are from an entity that can
176            // receive packets at the given address.
177            //
178            // Only use the first 64 bits from Ipv6 addresses since the latter
179            // 64 bits can change very frequently (as much as for every
180            // request).
181            let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1);
182            ReducedIpAddr::from(remote.ip()).hash(&mut hasher);
183            hasher.finish()
184        };
185
186        if real_p != client_p {
187            Self::send_response(
188                RawQueryServerResponse::Init(Init {
189                    p: real_p,
190                    max_supported_version: VERSION,
191                }),
192                remote,
193                socket,
194                metrics,
195            )
196            .await;
197
198            return;
199        }
200
201        if !self.ratelimit.can_request(remote.ip().into()) {
202            trace!("Ratelimited request");
203            metrics.ratelimited += 1;
204            return;
205        }
206
207        match request {
208            QueryServerRequest::Init => {
209                metrics.init_requests += 1;
210                Self::send_response(
211                    RawQueryServerResponse::Init(Init {
212                        p: real_p,
213                        max_supported_version: VERSION,
214                    }),
215                    remote,
216                    socket,
217                    metrics,
218                )
219                .await;
220            },
221            QueryServerRequest::ServerInfo => {
222                metrics.info_requests += 1;
223                let server_info = *self.server_info.borrow();
224                Self::send_response(
225                    RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)),
226                    remote,
227                    socket,
228                    metrics,
229                )
230                .await;
231            },
232        }
233    }
234
235    async fn send_response(
236        response: RawQueryServerResponse,
237        addr: SocketAddr,
238        socket: &UdpSocket,
239        metrics: &mut Metrics,
240    ) {
241        // TODO: Once more versions are added, send the packet in the same version as
242        // the request here.
243        match <RawQueryServerResponse as Parcel>::raw_bytes(&response, &Default::default()) {
244            Ok(data) => {
245                if data.len() > MAX_RESPONSE_SIZE {
246                    error!(
247                        ?MAX_RESPONSE_SIZE,
248                        "Attempted to send a response larger than the maximum allowed size (size: \
249                         {}, response: {response:?})",
250                        data.len()
251                    );
252                    #[cfg(debug_assertions)]
253                    panic!(
254                        "Attempted to send a response larger than the maximum allowed size (size: \
255                         {}, max: {}, response: {response:?})",
256                        data.len(),
257                        MAX_RESPONSE_SIZE
258                    );
259                }
260
261                match socket.send_to(&data, addr).await {
262                    Ok(_) => {
263                        metrics.sent_responses += 1;
264                    },
265                    Err(err) => {
266                        metrics.failed_responses += 1;
267                        debug!(?err, "Failed to send query server response");
268                    },
269                }
270            },
271            Err(error) => {
272                trace!(?error, "Failed to serialize response");
273                #[cfg(debug_assertions)]
274                panic!("Serializing response failed: {error:?} ({response:?})");
275            },
276        }
277    }
278}
279
280impl std::ops::AddAssign for Metrics {
281    fn add_assign(
282        &mut self,
283        Self {
284            received_packets,
285            dropped_packets,
286            invalid_packets,
287            proccessing_errors,
288            info_requests,
289            init_requests,
290            sent_responses,
291            failed_responses,
292            timed_out_responses,
293            ratelimited,
294        }: Self,
295    ) {
296        self.received_packets += received_packets;
297        self.dropped_packets += dropped_packets;
298        self.invalid_packets += invalid_packets;
299        self.proccessing_errors += proccessing_errors;
300        self.info_requests += info_requests;
301        self.init_requests += init_requests;
302        self.sent_responses += sent_responses;
303        self.failed_responses += failed_responses;
304        self.timed_out_responses += timed_out_responses;
305        self.ratelimited += ratelimited;
306    }
307}
308
309impl Metrics {
310    /// Resets all metrics to 0 and returns previous ones
311    ///
312    /// Used by the consumer of the metrics.
313    pub fn reset(&mut self) -> Self { std::mem::take(self) }
314}