1use crate::dispatcher::Dispatcher;
18use crate::error::{Error, Result};
19use crate::events::GameEvent;
20use crate::handlers::diff_and_dispatch;
21use crate::model::GameState;
22
23use bytes::Bytes;
24use http_body_util::{BodyExt, Full, Limited};
25use hyper::body::Incoming;
26use hyper::server::conn::http1;
27use hyper::service::service_fn;
28use hyper::{Method, Request, Response, StatusCode};
29use hyper_util::rt::TokioIo;
30use parking_lot::RwLock;
31use std::any::Any;
32use std::io;
33use std::net::{IpAddr, Ipv4Addr, SocketAddr};
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::net::TcpListener;
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tracing::{debug, error, instrument, trace, warn};
40
41const BIND_RETRY_DELAY: Duration = Duration::from_millis(250);
47const BIND_RETRY_ATTEMPTS: usize = 3;
48
49const MAX_BODY_BYTES: usize = 1024 * 1024;
54
55#[derive(Clone)]
60pub struct GameStateListener {
61 addr: SocketAddr,
62 dispatcher: Dispatcher,
63 last_state: Arc<RwLock<Option<GameState>>>,
64 runtime: Arc<RwLock<RuntimeHandle>>,
65}
66
67#[derive(Default)]
68struct RuntimeHandle {
69 shutdown_tx: Option<oneshot::Sender<()>>,
70 join: Option<JoinHandle<Result<()>>>,
71 bound_addr: Option<SocketAddr>,
72}
73
74impl GameStateListener {
75 pub fn new(port: u16) -> Self {
77 Self::with_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port))
78 }
79
80 pub fn with_addr(addr: SocketAddr) -> Self {
82 Self {
83 addr,
84 dispatcher: Dispatcher::default(),
85 last_state: Arc::new(RwLock::new(None)),
86 runtime: Arc::new(RwLock::new(RuntimeHandle::default())),
87 }
88 }
89
90 pub fn addr(&self) -> SocketAddr {
93 self.addr
94 }
95
96 pub fn actual_addr(&self) -> Option<SocketAddr> {
99 self.runtime.read().bound_addr
100 }
101
102 pub fn on<E, F>(&self, handler: F) -> &Self
112 where
113 E: Any + Send + Sync + 'static,
114 F: Fn(&E) + Send + Sync + 'static,
115 {
116 self.dispatcher.register(handler);
117 self
118 }
119
120 pub fn on_any<F>(&self, handler: F) -> &Self
122 where
123 F: Fn(&GameEvent) + Send + Sync + 'static,
124 {
125 self.dispatcher.register_any(handler);
126 self
127 }
128
129 pub fn current_state(&self) -> Option<GameState> {
131 self.last_state.read().clone()
132 }
133
134 pub fn is_running(&self) -> bool {
137 self.runtime.read().shutdown_tx.is_some()
138 }
139
140 #[instrument(level = "debug", skip(self), fields(addr = %self.addr))]
144 pub async fn start(&self) -> Result<()> {
145 self.start_with_fallbacks::<std::iter::Empty<SocketAddr>>(std::iter::empty())
146 .await
147 }
148
149 #[instrument(level = "debug", skip(self, fallbacks), fields(primary = %self.addr))]
163 pub async fn start_with_fallbacks<I>(&self, fallbacks: I) -> Result<()>
164 where
165 I: IntoIterator<Item = SocketAddr>,
166 {
167 if self.is_running() {
168 return Err(Error::AlreadyStarted);
169 }
170
171 let addrs: Vec<SocketAddr> = std::iter::once(self.addr).chain(fallbacks).collect();
172 let mut last_err: Option<(SocketAddr, io::Error)> = None;
173 let tcp = 'outer: {
174 for addr in &addrs {
175 match bind_with_retry(*addr).await {
176 Ok(t) => break 'outer t,
177 Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
178 debug!("address {addr} busy after retries, trying next fallback");
179 last_err = Some((*addr, e));
180 }
181 Err(e) => {
182 return Err(Error::Bind {
183 addr: addr.to_string(),
184 source: e,
185 });
186 }
187 }
188 }
189 let (_busy_addr, source) =
193 last_err.unwrap_or_else(|| (self.addr, io::Error::other("no addresses to try")));
194 return Err(Error::Bind {
195 addr: self.addr.to_string(),
196 source,
197 });
198 };
199 let bound = tcp.local_addr()?;
200
201 let dispatcher = self.dispatcher.clone();
202 let last_state = self.last_state.clone();
203 let (tx, rx) = oneshot::channel::<()>();
204
205 let join = tokio::spawn(serve_loop(tcp, dispatcher, last_state, rx));
206
207 let mut rt = self.runtime.write();
208 rt.shutdown_tx = Some(tx);
209 rt.join = Some(join);
210 rt.bound_addr = Some(bound);
211 debug!("GSI listener bound at {bound}");
212 Ok(())
213 }
214
215 pub async fn stop(&self) -> Result<()> {
217 let (tx, join) = {
218 let mut rt = self.runtime.write();
219 (rt.shutdown_tx.take(), rt.join.take())
220 };
221 let tx = tx.ok_or(Error::NotRunning)?;
222 let _ = tx.send(());
223 if let Some(handle) = join {
224 match handle.await {
225 Ok(Ok(())) => {}
226 Ok(Err(e)) => return Err(e),
227 Err(join_err) => {
228 warn!("listener join error: {join_err}");
229 }
230 }
231 }
232 self.runtime.write().bound_addr = None;
233 Ok(())
234 }
235}
236
237async fn bind_with_retry(addr: SocketAddr) -> io::Result<TcpListener> {
248 let mut last_err: Option<io::Error> = None;
249 for attempt in 0..BIND_RETRY_ATTEMPTS {
250 match TcpListener::bind(addr).await {
251 Ok(tcp) => return Ok(tcp),
252 Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
253 debug!(
254 "bind {addr} returned AddrInUse (attempt {}/{}), retrying in {:?}",
255 attempt + 1,
256 BIND_RETRY_ATTEMPTS,
257 BIND_RETRY_DELAY
258 );
259 last_err = Some(e);
260 if attempt + 1 < BIND_RETRY_ATTEMPTS {
263 tokio::time::sleep(BIND_RETRY_DELAY).await;
264 }
265 }
266 Err(e) => return Err(e),
267 }
268 }
269 Err(last_err.unwrap_or_else(|| io::Error::other("bind retry exhausted")))
270}
271
272#[instrument(level = "debug", skip_all, fields(addr = %tcp.local_addr().map(|a| a.to_string()).unwrap_or_default()))]
273async fn serve_loop(
274 tcp: TcpListener,
275 dispatcher: Dispatcher,
276 last_state: Arc<RwLock<Option<GameState>>>,
277 mut shutdown: oneshot::Receiver<()>,
278) -> Result<()> {
279 loop {
280 tokio::select! {
281 _ = &mut shutdown => {
282 debug!("shutdown signal received");
283 return Ok(());
284 }
285 accepted = tcp.accept() => {
286 let (stream, peer) = match accepted {
287 Ok(p) => p,
288 Err(e) => {
289 error!("accept error: {e}");
290 continue;
291 }
292 };
293 trace!("connection from {peer}");
294 let dispatcher = dispatcher.clone();
295 let last_state = last_state.clone();
296 tokio::spawn(async move {
297 let io = TokioIo::new(stream);
298 let svc = service_fn(move |req| {
299 let dispatcher = dispatcher.clone();
300 let last_state = last_state.clone();
301 async move { handle_request(req, dispatcher, last_state).await }
302 });
303 if let Err(e) = http1::Builder::new()
304 .keep_alive(true)
305 .serve_connection(io, svc)
306 .await
307 {
308 trace!("connection {peer} closed: {e}");
311 }
312 });
313 }
314 }
315 }
316}
317
318async fn handle_request(
319 req: Request<Incoming>,
320 dispatcher: Dispatcher,
321 last_state: Arc<RwLock<Option<GameState>>>,
322) -> std::result::Result<Response<Full<Bytes>>, hyper::Error> {
323 if req.method() != Method::POST {
324 return Ok(reply(
325 StatusCode::METHOD_NOT_ALLOWED,
326 "only POST is supported",
327 ));
328 }
329
330 let body = match Limited::new(req.into_body(), MAX_BODY_BYTES)
331 .collect()
332 .await
333 {
334 Ok(c) => c.to_bytes(),
335 Err(e) => {
336 warn!("body collect failed (cap {MAX_BODY_BYTES} bytes): {e}");
340 return Ok(reply(
341 StatusCode::PAYLOAD_TOO_LARGE,
342 "payload too large or read error",
343 ));
344 }
345 };
346
347 match GameState::from_slice(&body) {
348 Ok(state) => {
349 let prev = {
350 let mut guard = last_state.write();
351 let prev = guard.clone();
352 *guard = Some(state.clone());
353 prev
354 };
355 diff_and_dispatch(prev.as_ref(), &state, &dispatcher);
357 Ok(reply(StatusCode::OK, ""))
358 }
359 Err(e) => {
360 warn!("invalid GSI payload: {e}");
361 Ok(reply(StatusCode::BAD_REQUEST, "invalid payload"))
362 }
363 }
364}
365
366fn reply(status: StatusCode, body: &'static str) -> Response<Full<Bytes>> {
367 Response::builder()
368 .status(status)
369 .header("content-type", "text/plain; charset=utf-8")
370 .body(Full::new(Bytes::from_static(body.as_bytes())))
371 .expect("static response builder cannot fail")
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use crate::events::PlayerDied;
378 use std::net::SocketAddr;
379 use std::sync::atomic::{AtomicUsize, Ordering};
380 use std::sync::Arc;
381 use std::time::Duration;
382
383 fn payload_with_health(name: &str, hp: i32) -> String {
384 format!(
385 r#"{{"provider":{{"name":"Counter-Strike 2","appid":"730","version":"14000","steamid":"7656"}},"player":{{"steamid":"7656","name":"{name}","team":"CT","activity":"playing","state":{{"health":"{hp}","armor":"100","money":"800","round_kills":"0","round_killhs":"0","round_totaldmg":"0","equip_value":"800","flashed":"0","smoked":"0","burning":"0"}}}}}}"#
386 )
387 }
388
389 #[tokio::test]
390 async fn end_to_end_player_died() {
391 let listener = GameStateListener::with_addr(SocketAddr::from(([127, 0, 0, 1], 0)));
392 let died = Arc::new(AtomicUsize::new(0));
393 let died2 = died.clone();
394 listener.on(move |_e: &PlayerDied| {
395 died2.fetch_add(1, Ordering::SeqCst);
396 });
397 listener.start().await.unwrap();
398 let url = format!("http://{}/", listener.actual_addr().unwrap());
399 let client = reqwest::Client::new();
400 client
401 .post(&url)
402 .body(payload_with_health("alice", 100))
403 .send()
404 .await
405 .unwrap();
406 client
407 .post(&url)
408 .body(payload_with_health("alice", 0))
409 .send()
410 .await
411 .unwrap();
412 tokio::time::sleep(Duration::from_millis(50)).await;
414 assert_eq!(died.load(Ordering::SeqCst), 1);
415 listener.stop().await.unwrap();
416 assert!(!listener.is_running());
417 }
418
419 #[tokio::test]
420 async fn rejects_non_post() {
421 let listener = GameStateListener::with_addr(SocketAddr::from(([127, 0, 0, 1], 0)));
422 listener.start().await.unwrap();
423 let url = format!("http://{}/", listener.actual_addr().unwrap());
424 let resp = reqwest::Client::new().get(&url).send().await.unwrap();
425 assert_eq!(resp.status().as_u16(), 405);
426 listener.stop().await.unwrap();
427 }
428
429 #[tokio::test]
430 async fn bind_retry_succeeds_when_squatter_releases() {
431 let probe = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
434 .await
435 .unwrap();
436 let addr = probe.local_addr().unwrap();
437
438 tokio::spawn(async move {
441 tokio::time::sleep(Duration::from_millis(150)).await;
442 drop(probe);
443 });
444
445 let listener = GameStateListener::with_addr(addr);
446 listener.start().await.expect("retry should win the race");
450 listener.stop().await.unwrap();
451 }
452
453 #[tokio::test]
454 async fn bind_retry_eventually_surfaces_real_conflict() {
455 let squatter = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
458 .await
459 .unwrap();
460 let addr = squatter.local_addr().unwrap();
461
462 let listener = GameStateListener::with_addr(addr);
463 let start = std::time::Instant::now();
464 let err = listener.start().await.expect_err("must fail");
465 let elapsed = start.elapsed();
466 assert!(
468 elapsed < Duration::from_secs(3),
469 "bind retry should bound failure latency, took {elapsed:?}"
470 );
471 match err {
472 Error::Bind { .. } => {}
473 other => panic!("expected Bind error, got {other:?}"),
474 }
475 drop(squatter);
476 }
477
478 #[tokio::test]
479 async fn start_with_fallbacks_picks_first_free_port() {
480 let primary_squatter = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
484 .await
485 .unwrap();
486 let primary_addr = primary_squatter.local_addr().unwrap();
487 let fallback1_squatter = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
488 .await
489 .unwrap();
490 let fallback1_addr = fallback1_squatter.local_addr().unwrap();
491
492 let listener = GameStateListener::with_addr(primary_addr);
493 listener
494 .start_with_fallbacks([fallback1_addr, SocketAddr::from(([127, 0, 0, 1], 0))])
495 .await
496 .expect("port 0 fallback should bind");
497
498 let bound = listener.actual_addr().unwrap();
499 assert_ne!(bound, primary_addr, "should not have used busy primary");
500 assert_ne!(bound, fallback1_addr, "should not have used busy fallback");
501 assert_ne!(bound.port(), 0, "OS must have assigned a real port");
502
503 listener.stop().await.unwrap();
504 drop(primary_squatter);
505 drop(fallback1_squatter);
506 }
507}