1#[allow(deprecated)] use std::hash::SipHasher;
2use std::{
3 hash::{Hash, Hasher},
4 io::{self, ErrorKind},
5 net::SocketAddr,
6 sync::{Arc, Mutex},
7 time::{Duration, Instant},
8};
9
10use protocol::Parcel;
11use rand::{thread_rng, Rng};
12use tokio::{net::UdpSocket, sync::watch};
13use tracing::{debug, error, trace};
14
15use crate::{
16 proto::{
17 Init, QueryServerRequest, QueryServerResponse, RawQueryServerRequest,
18 RawQueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER,
19 VERSION,
20 },
21 ratelimit::{RateLimiter, ReducedIpAddr},
22};
23
24const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(300);
25
26pub struct QueryServer {
27 addr: SocketAddr,
28 server_info: watch::Receiver<ServerInfo>,
29 settings: protocol::Settings,
30 ratelimit: RateLimiter,
31}
32
33#[derive(Default, Clone, Copy, Debug)]
34pub struct Metrics {
35 pub received_packets: u32,
36 pub dropped_packets: u32,
37 pub invalid_packets: u32,
38 pub proccessing_errors: u32,
39 pub info_requests: u32,
40 pub init_requests: u32,
41 pub sent_responses: u32,
42 pub failed_responses: u32,
43 pub timed_out_responses: u32,
44 pub ratelimited: u32,
45}
46
47impl QueryServer {
48 pub fn new(addr: SocketAddr, server_info: watch::Receiver<ServerInfo>, ratelimit: u16) -> Self {
49 Self {
50 addr,
51 server_info,
52 ratelimit: RateLimiter::new(ratelimit),
53 settings: Default::default(),
54 }
55 }
56
57 pub async fn run(&mut self, metrics: Arc<Mutex<Metrics>>) -> Result<(), tokio::io::Error> {
64 let mut socket = UdpSocket::bind(self.addr).await?;
65
66 let gen_secret = || {
67 let mut rng = thread_rng();
68 (rng.gen::<u64>(), rng.gen::<u64>())
69 };
70 let mut secrets = gen_secret();
71 let mut last_secret_refresh = Instant::now();
72
73 let mut buf = Box::new([0; MAX_REQUEST_SIZE]);
74 loop {
75 let (len, remote_addr) = match socket.recv_from(&mut *buf).await {
76 Ok(v) => v,
77 Err(e) if e.kind() == ErrorKind::NotConnected => {
78 error!(
79 ?e,
80 "Query server connection was closed, re-binding to socket..."
81 );
82 socket = UdpSocket::bind(self.addr).await?;
83 continue;
84 },
85 err => {
86 debug!(?err, "Error while receiving from query server socket");
87 continue;
88 },
89 };
90
91 let mut new_metrics = Metrics {
92 received_packets: 1,
93 ..Default::default()
94 };
95
96 let raw_msg_buf = &buf[..len];
97 let msg_buf = if Self::validate_datagram(raw_msg_buf) {
98 &raw_msg_buf[2..(raw_msg_buf.len() - VELOREN_HEADER.len())]
100 } else {
101 new_metrics.dropped_packets += 1;
102 continue;
103 };
104
105 self.process_datagram(msg_buf, remote_addr, secrets, &mut new_metrics, &socket)
106 .await;
107
108 if let Ok(mut metrics) = metrics.lock() {
110 *metrics += new_metrics;
111 }
112
113 {
114 let now = Instant::now();
115 if now.duration_since(last_secret_refresh) > SECRET_REGEN_INTERNVAL {
116 last_secret_refresh = now;
117 secrets = gen_secret();
118 }
119
120 self.ratelimit.maintain(now);
121 }
122 }
123 }
124
125 fn validate_datagram(data: &[u8]) -> bool {
127 let len = data.len();
128 if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) {
130 trace!(?len, "Datagram too short");
131 false
132 } else if len > MAX_REQUEST_SIZE {
133 trace!(?len, "Datagram too large");
134 false
135 } else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER {
136 trace!(?len, "Datagram header invalid");
137 false
138 } else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION {
140 trace!(
141 "Datagram has invalid version {:?}, current {VERSION:?}",
142 &data[..2]
143 );
144 false
145 } else {
146 true
147 }
148 }
149
150 async fn process_datagram(
151 &mut self,
152 datagram: &[u8],
153 remote: SocketAddr,
154 secrets: (u64, u64),
155 metrics: &mut Metrics,
156 socket: &UdpSocket,
157 ) {
158 let Ok(RawQueryServerRequest {
159 p: client_p,
160 request,
161 }) =
162 <RawQueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
163 else {
164 metrics.invalid_packets += 1;
165 return;
166 };
167
168 trace!(?request, "Received packet");
169
170 #[allow(deprecated)]
171 let real_p = {
172 let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1);
182 ReducedIpAddr::from(remote.ip()).hash(&mut hasher);
183 hasher.finish()
184 };
185
186 if real_p != client_p {
187 Self::send_response(
188 RawQueryServerResponse::Init(Init {
189 p: real_p,
190 max_supported_version: VERSION,
191 }),
192 remote,
193 socket,
194 metrics,
195 )
196 .await;
197
198 return;
199 }
200
201 if !self.ratelimit.can_request(remote.ip().into()) {
202 trace!("Ratelimited request");
203 metrics.ratelimited += 1;
204 return;
205 }
206
207 match request {
208 QueryServerRequest::Init => {
209 metrics.init_requests += 1;
210 Self::send_response(
211 RawQueryServerResponse::Init(Init {
212 p: real_p,
213 max_supported_version: VERSION,
214 }),
215 remote,
216 socket,
217 metrics,
218 )
219 .await;
220 },
221 QueryServerRequest::ServerInfo => {
222 metrics.info_requests += 1;
223 let server_info = *self.server_info.borrow();
224 Self::send_response(
225 RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)),
226 remote,
227 socket,
228 metrics,
229 )
230 .await;
231 },
232 }
233 }
234
235 async fn send_response(
236 response: RawQueryServerResponse,
237 addr: SocketAddr,
238 socket: &UdpSocket,
239 metrics: &mut Metrics,
240 ) {
241 match <RawQueryServerResponse as Parcel>::raw_bytes(&response, &Default::default()) {
244 Ok(data) => {
245 if data.len() > MAX_RESPONSE_SIZE {
246 error!(
247 ?MAX_RESPONSE_SIZE,
248 "Attempted to send a response larger than the maximum allowed size (size: \
249 {}, response: {response:?})",
250 data.len()
251 );
252 #[cfg(debug_assertions)]
253 panic!(
254 "Attempted to send a response larger than the maximum allowed size (size: \
255 {}, max: {}, response: {response:?})",
256 data.len(),
257 MAX_RESPONSE_SIZE
258 );
259 }
260
261 match socket.send_to(&data, addr).await {
262 Ok(_) => {
263 metrics.sent_responses += 1;
264 },
265 Err(err) => {
266 metrics.failed_responses += 1;
267 debug!(?err, "Failed to send query server response");
268 },
269 }
270 },
271 Err(error) => {
272 trace!(?error, "Failed to serialize response");
273 #[cfg(debug_assertions)]
274 panic!("Serializing response failed: {error:?} ({response:?})");
275 },
276 }
277 }
278}
279
280impl std::ops::AddAssign for Metrics {
281 fn add_assign(
282 &mut self,
283 Self {
284 received_packets,
285 dropped_packets,
286 invalid_packets,
287 proccessing_errors,
288 info_requests,
289 init_requests,
290 sent_responses,
291 failed_responses,
292 timed_out_responses,
293 ratelimited,
294 }: Self,
295 ) {
296 self.received_packets += received_packets;
297 self.dropped_packets += dropped_packets;
298 self.invalid_packets += invalid_packets;
299 self.proccessing_errors += proccessing_errors;
300 self.info_requests += info_requests;
301 self.init_requests += init_requests;
302 self.sent_responses += sent_responses;
303 self.failed_responses += failed_responses;
304 self.timed_out_responses += timed_out_responses;
305 self.ratelimited += ratelimited;
306 }
307}
308
309impl Metrics {
310 pub fn reset(&mut self) -> Self { std::mem::take(self) }
314}