Skip to main content

cs2_gsi/
listener.rs

1//! HTTP listener that receives GSI POST payloads from CS2.
2//!
3//! `GameStateListener` is the single entry point of the library.
4//!
5//! ```no_run
6//! use cs2_gsi::{events::PlayerDied, GameStateListener};
7//!
8//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
9//! let listener = GameStateListener::new(4000);
10//! listener.on(|e: &PlayerDied| {
11//!     println!("{} died at {} HP", e.player.name, e.previous_health);
12//! });
13//! listener.start().await?;
14//! # Ok(()) }
15//! ```
16
17use crate::dispatcher::Dispatcher;
18use crate::error::{Error, Result};
19use crate::events::GameEvent;
20use crate::handlers::diff_and_dispatch;
21use crate::model::GameState;
22
23use bytes::Bytes;
24use http_body_util::{BodyExt, Full, Limited};
25use hyper::body::Incoming;
26use hyper::server::conn::http1;
27use hyper::service::service_fn;
28use hyper::{Method, Request, Response, StatusCode};
29use hyper_util::rt::TokioIo;
30use parking_lot::RwLock;
31use std::any::Any;
32use std::io;
33use std::net::{IpAddr, Ipv4Addr, SocketAddr};
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::net::TcpListener;
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tracing::{debug, error, instrument, trace, warn};
40
41/// How long to wait between bind retries when the address is reported as
42/// in use. Tuned for the typical TIME_WAIT / dev-loop hand-off window —
43/// long enough to outlast a parent `cargo tauri dev` rebuild but short
44/// enough that a real port conflict surfaces in well under a second so
45/// the caller can fall back to an alternative.
46const BIND_RETRY_DELAY: Duration = Duration::from_millis(250);
47const BIND_RETRY_ATTEMPTS: usize = 3;
48
49/// Hard cap on POST body size, in bytes. Real GSI payloads are well under
50/// 100 KB even at full data-section coverage; 1 MiB is a generous ceiling
51/// that prevents a misbehaving (or malicious) local sender from feeding
52/// the listener arbitrary memory.
53const MAX_BODY_BYTES: usize = 1024 * 1024;
54
55/// HTTP listener for CS2 GSI payloads.
56///
57/// Cheap to clone — every clone shares the same dispatcher and last-state
58/// cache. Handlers registered through any clone fire on every payload.
59#[derive(Clone)]
60pub struct GameStateListener {
61    addr: SocketAddr,
62    dispatcher: Dispatcher,
63    last_state: Arc<RwLock<Option<GameState>>>,
64    runtime: Arc<RwLock<RuntimeHandle>>,
65}
66
67#[derive(Default)]
68struct RuntimeHandle {
69    shutdown_tx: Option<oneshot::Sender<()>>,
70    join: Option<JoinHandle<Result<()>>>,
71    bound_addr: Option<SocketAddr>,
72}
73
74impl GameStateListener {
75    /// Create a listener that will bind to `127.0.0.1:<port>` when started.
76    pub fn new(port: u16) -> Self {
77        Self::with_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port))
78    }
79
80    /// Create a listener with a fully specified bind address.
81    pub fn with_addr(addr: SocketAddr) -> Self {
82        Self {
83            addr,
84            dispatcher: Dispatcher::default(),
85            last_state: Arc::new(RwLock::new(None)),
86            runtime: Arc::new(RwLock::new(RuntimeHandle::default())),
87        }
88    }
89
90    /// The configured bind address. After [`start`](Self::start) succeeds,
91    /// this is also returned by [`actual_addr`](Self::actual_addr).
92    pub fn addr(&self) -> SocketAddr {
93        self.addr
94    }
95
96    /// The actual bound socket address — useful when you bind to port `0`
97    /// and want to discover the OS-assigned port.
98    pub fn actual_addr(&self) -> Option<SocketAddr> {
99        self.runtime.read().bound_addr
100    }
101
102    /// Subscribe to a strongly typed event.
103    ///
104    /// ```no_run
105    /// # use cs2_gsi::{events::PlayerGotKill, GameStateListener};
106    /// let gsl = GameStateListener::new(4000);
107    /// gsl.on(|e: &PlayerGotKill| {
108    ///     println!("{} now has {} round kills", e.player.name, e.new_round_kills);
109    /// });
110    /// ```
111    pub fn on<E, F>(&self, handler: F) -> &Self
112    where
113        E: Any + Send + Sync + 'static,
114        F: Fn(&E) + Send + Sync + 'static,
115    {
116        self.dispatcher.register(handler);
117        self
118    }
119
120    /// Subscribe to every event as a [`GameEvent`].
121    pub fn on_any<F>(&self, handler: F) -> &Self
122    where
123        F: Fn(&GameEvent) + Send + Sync + 'static,
124    {
125        self.dispatcher.register_any(handler);
126        self
127    }
128
129    /// The most recently received game state, if any.
130    pub fn current_state(&self) -> Option<GameState> {
131        self.last_state.read().clone()
132    }
133
134    /// `true` once [`start`](Self::start) has succeeded and before
135    /// [`stop`](Self::stop) is called.
136    pub fn is_running(&self) -> bool {
137        self.runtime.read().shutdown_tx.is_some()
138    }
139
140    /// Bind the listener and start accepting GSI payloads. Returns once the
141    /// socket is bound — the actual serve loop runs as a background tokio
142    /// task. Call [`stop`](Self::stop) to shut it down.
143    #[instrument(level = "debug", skip(self), fields(addr = %self.addr))]
144    pub async fn start(&self) -> Result<()> {
145        self.start_with_fallbacks::<std::iter::Empty<SocketAddr>>(std::iter::empty())
146            .await
147    }
148
149    /// Bind, falling back to alternative addresses if the primary is busy.
150    ///
151    /// Tries `self.addr` first. If that address is reported as in use
152    /// (after the per-address retry budget is exhausted), each fallback
153    /// is tried in turn. The first address that binds wins; the actual
154    /// chosen address is then available via
155    /// [`actual_addr`](Self::actual_addr).
156    ///
157    /// Pass `port = 0` as a final fallback to ask the OS to pick any
158    /// free ephemeral port — that bind effectively cannot fail.
159    ///
160    /// All non-`AddrInUse` errors short-circuit immediately (no point
161    /// trying fallbacks if e.g. the loopback interface is gone).
162    #[instrument(level = "debug", skip(self, fallbacks), fields(primary = %self.addr))]
163    pub async fn start_with_fallbacks<I>(&self, fallbacks: I) -> Result<()>
164    where
165        I: IntoIterator<Item = SocketAddr>,
166    {
167        if self.is_running() {
168            return Err(Error::AlreadyStarted);
169        }
170
171        let addrs: Vec<SocketAddr> = std::iter::once(self.addr).chain(fallbacks).collect();
172        let mut last_err: Option<(SocketAddr, io::Error)> = None;
173        let tcp = 'outer: {
174            for addr in &addrs {
175                match bind_with_retry(*addr).await {
176                    Ok(t) => break 'outer t,
177                    Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
178                        debug!("address {addr} busy after retries, trying next fallback");
179                        last_err = Some((*addr, e));
180                    }
181                    Err(e) => {
182                        return Err(Error::Bind {
183                            addr: addr.to_string(),
184                            source: e,
185                        });
186                    }
187                }
188            }
189            // Every candidate was AddrInUse — surface the *last* one's
190            // error against the *primary* address (it's the one the
191            // caller actually asked for).
192            let (_busy_addr, source) =
193                last_err.unwrap_or_else(|| (self.addr, io::Error::other("no addresses to try")));
194            return Err(Error::Bind {
195                addr: self.addr.to_string(),
196                source,
197            });
198        };
199        let bound = tcp.local_addr()?;
200
201        let dispatcher = self.dispatcher.clone();
202        let last_state = self.last_state.clone();
203        let (tx, rx) = oneshot::channel::<()>();
204
205        let join = tokio::spawn(serve_loop(tcp, dispatcher, last_state, rx));
206
207        let mut rt = self.runtime.write();
208        rt.shutdown_tx = Some(tx);
209        rt.join = Some(join);
210        rt.bound_addr = Some(bound);
211        debug!("GSI listener bound at {bound}");
212        Ok(())
213    }
214
215    /// Signal the serve loop to exit and wait for it to finish.
216    pub async fn stop(&self) -> Result<()> {
217        let (tx, join) = {
218            let mut rt = self.runtime.write();
219            (rt.shutdown_tx.take(), rt.join.take())
220        };
221        let tx = tx.ok_or(Error::NotRunning)?;
222        let _ = tx.send(());
223        if let Some(handle) = join {
224            match handle.await {
225                Ok(Ok(())) => {}
226                Ok(Err(e)) => return Err(e),
227                Err(join_err) => {
228                    warn!("listener join error: {join_err}");
229                }
230            }
231        }
232        self.runtime.write().bound_addr = None;
233        Ok(())
234    }
235}
236
237/// Bind to `addr`, retrying briefly on `AddrInUse`.
238///
239/// Targets the *real* failure mode in dev: when a watcher (cargo tauri
240/// dev, cargo-watch, …) restarts the process, the previous binary's
241/// socket is usually still in TIME_WAIT for a fraction of a second and
242/// the new bind would otherwise return `WSAEADDRINUSE` (Windows) /
243/// `EADDRINUSE` (Linux/macOS). Retries are bounded — a genuine port
244/// conflict surfaces in roughly
245/// `BIND_RETRY_ATTEMPTS * BIND_RETRY_DELAY` (≈ 750 ms with the current
246/// 3 × 250 ms tuning) with the original error.
247async fn bind_with_retry(addr: SocketAddr) -> io::Result<TcpListener> {
248    let mut last_err: Option<io::Error> = None;
249    for attempt in 0..BIND_RETRY_ATTEMPTS {
250        match TcpListener::bind(addr).await {
251            Ok(tcp) => return Ok(tcp),
252            Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
253                debug!(
254                    "bind {addr} returned AddrInUse (attempt {}/{}), retrying in {:?}",
255                    attempt + 1,
256                    BIND_RETRY_ATTEMPTS,
257                    BIND_RETRY_DELAY
258                );
259                last_err = Some(e);
260                // Skip the trailing sleep on the last attempt — the caller
261                // can immediately fall back to the next candidate address.
262                if attempt + 1 < BIND_RETRY_ATTEMPTS {
263                    tokio::time::sleep(BIND_RETRY_DELAY).await;
264                }
265            }
266            Err(e) => return Err(e),
267        }
268    }
269    Err(last_err.unwrap_or_else(|| io::Error::other("bind retry exhausted")))
270}
271
272#[instrument(level = "debug", skip_all, fields(addr = %tcp.local_addr().map(|a| a.to_string()).unwrap_or_default()))]
273async fn serve_loop(
274    tcp: TcpListener,
275    dispatcher: Dispatcher,
276    last_state: Arc<RwLock<Option<GameState>>>,
277    mut shutdown: oneshot::Receiver<()>,
278) -> Result<()> {
279    loop {
280        tokio::select! {
281            _ = &mut shutdown => {
282                debug!("shutdown signal received");
283                return Ok(());
284            }
285            accepted = tcp.accept() => {
286                let (stream, peer) = match accepted {
287                    Ok(p) => p,
288                    Err(e) => {
289                        error!("accept error: {e}");
290                        continue;
291                    }
292                };
293                trace!("connection from {peer}");
294                let dispatcher = dispatcher.clone();
295                let last_state = last_state.clone();
296                tokio::spawn(async move {
297                    let io = TokioIo::new(stream);
298                    let svc = service_fn(move |req| {
299                        let dispatcher = dispatcher.clone();
300                        let last_state = last_state.clone();
301                        async move { handle_request(req, dispatcher, last_state).await }
302                    });
303                    if let Err(e) = http1::Builder::new()
304                        .keep_alive(true)
305                        .serve_connection(io, svc)
306                        .await
307                    {
308                        // CS2 occasionally drops the connection mid-keepalive
309                        // — log at trace level so it doesn't spam.
310                        trace!("connection {peer} closed: {e}");
311                    }
312                });
313            }
314        }
315    }
316}
317
318async fn handle_request(
319    req: Request<Incoming>,
320    dispatcher: Dispatcher,
321    last_state: Arc<RwLock<Option<GameState>>>,
322) -> std::result::Result<Response<Full<Bytes>>, hyper::Error> {
323    if req.method() != Method::POST {
324        return Ok(reply(
325            StatusCode::METHOD_NOT_ALLOWED,
326            "only POST is supported",
327        ));
328    }
329
330    let body = match Limited::new(req.into_body(), MAX_BODY_BYTES)
331        .collect()
332        .await
333    {
334        Ok(c) => c.to_bytes(),
335        Err(e) => {
336            // `Limited` returns a boxed error on overflow; we cannot tell it
337            // apart from a transport read error without downcasting, so
338            // surface 413 with the underlying detail in logs.
339            warn!("body collect failed (cap {MAX_BODY_BYTES} bytes): {e}");
340            return Ok(reply(
341                StatusCode::PAYLOAD_TOO_LARGE,
342                "payload too large or read error",
343            ));
344        }
345    };
346
347    match GameState::from_slice(&body) {
348        Ok(state) => {
349            let prev = {
350                let mut guard = last_state.write();
351                let prev = guard.clone();
352                *guard = Some(state.clone());
353                prev
354            };
355            // Diff & dispatch synchronously — keep deterministic ordering.
356            diff_and_dispatch(prev.as_ref(), &state, &dispatcher);
357            Ok(reply(StatusCode::OK, ""))
358        }
359        Err(e) => {
360            warn!("invalid GSI payload: {e}");
361            Ok(reply(StatusCode::BAD_REQUEST, "invalid payload"))
362        }
363    }
364}
365
366fn reply(status: StatusCode, body: &'static str) -> Response<Full<Bytes>> {
367    Response::builder()
368        .status(status)
369        .header("content-type", "text/plain; charset=utf-8")
370        .body(Full::new(Bytes::from_static(body.as_bytes())))
371        .expect("static response builder cannot fail")
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use crate::events::PlayerDied;
378    use std::net::SocketAddr;
379    use std::sync::atomic::{AtomicUsize, Ordering};
380    use std::sync::Arc;
381    use std::time::Duration;
382
383    fn payload_with_health(name: &str, hp: i32) -> String {
384        format!(
385            r#"{{"provider":{{"name":"Counter-Strike 2","appid":"730","version":"14000","steamid":"7656"}},"player":{{"steamid":"7656","name":"{name}","team":"CT","activity":"playing","state":{{"health":"{hp}","armor":"100","money":"800","round_kills":"0","round_killhs":"0","round_totaldmg":"0","equip_value":"800","flashed":"0","smoked":"0","burning":"0"}}}}}}"#
386        )
387    }
388
389    #[tokio::test]
390    async fn end_to_end_player_died() {
391        let listener = GameStateListener::with_addr(SocketAddr::from(([127, 0, 0, 1], 0)));
392        let died = Arc::new(AtomicUsize::new(0));
393        let died2 = died.clone();
394        listener.on(move |_e: &PlayerDied| {
395            died2.fetch_add(1, Ordering::SeqCst);
396        });
397        listener.start().await.unwrap();
398        let url = format!("http://{}/", listener.actual_addr().unwrap());
399        let client = reqwest::Client::new();
400        client
401            .post(&url)
402            .body(payload_with_health("alice", 100))
403            .send()
404            .await
405            .unwrap();
406        client
407            .post(&url)
408            .body(payload_with_health("alice", 0))
409            .send()
410            .await
411            .unwrap();
412        // Allow the spawned diff to run.
413        tokio::time::sleep(Duration::from_millis(50)).await;
414        assert_eq!(died.load(Ordering::SeqCst), 1);
415        listener.stop().await.unwrap();
416        assert!(!listener.is_running());
417    }
418
419    #[tokio::test]
420    async fn rejects_non_post() {
421        let listener = GameStateListener::with_addr(SocketAddr::from(([127, 0, 0, 1], 0)));
422        listener.start().await.unwrap();
423        let url = format!("http://{}/", listener.actual_addr().unwrap());
424        let resp = reqwest::Client::new().get(&url).send().await.unwrap();
425        assert_eq!(resp.status().as_u16(), 405);
426        listener.stop().await.unwrap();
427    }
428
429    #[tokio::test]
430    async fn bind_retry_succeeds_when_squatter_releases() {
431        // Pin a port by binding briefly, releasing it, and pinning the
432        // *same* port — emulating the dev-restart TIME_WAIT window.
433        let probe = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
434            .await
435            .unwrap();
436        let addr = probe.local_addr().unwrap();
437
438        // Start a task that holds the port for ~150ms then drops it,
439        // well within the retry budget (6 × 250ms = 1.5s).
440        tokio::spawn(async move {
441            tokio::time::sleep(Duration::from_millis(150)).await;
442            drop(probe);
443        });
444
445        let listener = GameStateListener::with_addr(addr);
446        // Without retry, this would race and frequently fail; with
447        // retry, the squatter releases on attempt 1 or 2 and the bind
448        // succeeds.
449        listener.start().await.expect("retry should win the race");
450        listener.stop().await.unwrap();
451    }
452
453    #[tokio::test]
454    async fn bind_retry_eventually_surfaces_real_conflict() {
455        // A held port that *never* releases must surface as Bind error
456        // within the retry budget — not hang forever.
457        let squatter = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
458            .await
459            .unwrap();
460        let addr = squatter.local_addr().unwrap();
461
462        let listener = GameStateListener::with_addr(addr);
463        let start = std::time::Instant::now();
464        let err = listener.start().await.expect_err("must fail");
465        let elapsed = start.elapsed();
466        // Total budget is 6 × 250ms = 1.5s; allow some slack.
467        assert!(
468            elapsed < Duration::from_secs(3),
469            "bind retry should bound failure latency, took {elapsed:?}"
470        );
471        match err {
472            Error::Bind { .. } => {}
473            other => panic!("expected Bind error, got {other:?}"),
474        }
475        drop(squatter);
476    }
477
478    #[tokio::test]
479    async fn start_with_fallbacks_picks_first_free_port() {
480        // Pin two adjacent ports as the "preferred" + "first fallback".
481        // The listener should walk past both and land on the second
482        // fallback (port 0 → OS-assigned), which always succeeds.
483        let primary_squatter = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
484            .await
485            .unwrap();
486        let primary_addr = primary_squatter.local_addr().unwrap();
487        let fallback1_squatter = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
488            .await
489            .unwrap();
490        let fallback1_addr = fallback1_squatter.local_addr().unwrap();
491
492        let listener = GameStateListener::with_addr(primary_addr);
493        listener
494            .start_with_fallbacks([fallback1_addr, SocketAddr::from(([127, 0, 0, 1], 0))])
495            .await
496            .expect("port 0 fallback should bind");
497
498        let bound = listener.actual_addr().unwrap();
499        assert_ne!(bound, primary_addr, "should not have used busy primary");
500        assert_ne!(bound, fallback1_addr, "should not have used busy fallback");
501        assert_ne!(bound.port(), 0, "OS must have assigned a real port");
502
503        listener.stop().await.unwrap();
504        drop(primary_squatter);
505        drop(fallback1_squatter);
506    }
507}