Skip to main content

dota/
lib.rs

1//! Game State Integration with Dota 2.
2//!
3//! Provides a [`Server`] that listens for JSON events sent by Dota 2. Enabling game state
4//! integration requires:
5//! 1. Creating a `.cfg` [configuration file] in the Dota 2 game configuration directory.
6//! 2. Running Dota 2 with the -gamestateintegration [launch option].
7//!
8//! The configuration file can have any name name, but must be prefixed by `gamestate_integration_`.
9//! For example, `gamestate_integration_test.cfg` would be located:
10//! * In Linux: `~/.steam/steam/steamapps/common/dota 2 beta/game/dota/cfg/gamestate_integration_test.cfg`
11//! * In Windows: `D:\Steam\steamapps\common\dota 2 beta\dota\cfg\gamestate_integration_test.cfg`
12//!
13//! Here's a sample configuration file:
14//!
15//! "dota2-gsi Configuration"
16//!{
17//!    "uri"               "http://127.0.0.1:53000/"
18//!    "timeout"           "5.0"
19//!    "buffer"            "0.1"
20//!    "throttle"          "0.1"
21//!    "heartbeat"         "30.0"
22//!    "data"
23//!    {
24//!        "buildings"     "1"
25//!        "provider"      "1"
26//!        "map"           "1"
27//!        "player"        "1"
28//!        "hero"          "1"
29//!        "abilities"     "1"
30//!        "items"         "1"
31//!        "draft"         "1"
32//!        "wearables"     "1"
33//!    }
34//!    "auth"
35//!    {
36//!        "token"         "abcdefghijklmopqrstuvxyz123456789"
37//!    }
38//!}
39//!
40//! Note the URI used in the configuration file must be the same URI used with a [`ServerBuilder`].
41//!
42//! [^configuration file]: Details on configuration file: https://developer.valvesoftware.com/wiki/Counter-Strike:_Global_Offensive_Game_State_Integration
43//! [^launch option]: Available launch options: https://help.steampowered.com/en/faqs/view/7d01-d2dd-d75e-2955
44use std::error::Error as StdError;
45use std::fmt::{self, Display};
46use std::future::Future;
47use std::io;
48
49use async_trait::async_trait;
50use futures::{StreamExt, stream};
51use tokio::io::{AsyncReadExt, AsyncWriteExt};
52use tokio::net::{TcpListener, TcpStream};
53use tokio::sync::broadcast;
54use tokio::task;
55
56#[cfg(feature = "models")]
57pub mod components;
58#[cfg(feature = "diff")]
59pub mod diff;
60#[cfg(feature = "diff")]
61pub mod event;
62#[cfg(feature = "handlers")]
63pub mod handlers;
64
65/// The payload sent on every game state integration request is usually between 50-60kb.
66/// We initialize a buffer to read the request with this initial capacity.
67/// The code then looks at the Content-Length header to reserve the required capacity.
68const INITIAL_REQUEST_BUFFER_CAPACITY_BYTES: usize = 1024;
69
70/// The POST game state integration request includes this number of headers.
71/// We parse them to find the Content-Length.
72const EXPECTED_NUMBER_OF_HEADERS: usize = 7;
73
74/// The response expected for every game state integration request.
75/// Failure to deliver this response would cause the request to be retried infinitely.
76const OK: &str = "HTTP/1.1 200 OK\ncontent-type: text/html\n";
77
78#[derive(thiserror::Error, Debug)]
79pub enum GameStateIntegrationError {
80    #[error("incomplete headers from game state integration request")]
81    IncompleteHeaders,
82    #[error("failed to read from socket")]
83    SocketRead(#[from] io::Error),
84    #[error("no handlers available to process request, is the server shutting down?")]
85    NoHandlersAvailable,
86    #[error("invalid content length header: {0}")]
87    InvalidContentLength(String),
88    #[error("missing Content-Length header in request")]
89    MissingContentLengthHeader,
90    #[error("invalid request received")]
91    InvalidRequest(#[from] httparse::Error),
92    #[error("unexpected EOF")]
93    UnexpectedEOF,
94    #[error("server has already shutdown")]
95    ServerShutdown,
96    #[error("handler failed when handling event")]
97    Handler {
98        #[source]
99        source: anyhow::Error,
100    },
101    #[error("an error occurred while running the server")]
102    Unknown(#[from] task::JoinError),
103}
104
105pub type HandlerResult = Result<(), anyhow::Error>;
106
107/// Trait for any async function or struct that can be used to handle game state integration events.
108///
109/// This trait is automatically implemented for async functions and closures.
110#[async_trait]
111pub trait Handler: Send + Sync + 'static {
112    async fn handle(&self, event: bytes::Bytes) -> HandlerResult;
113}
114
115/// Mutable version of the [`Handler`] trait
116#[async_trait]
117pub trait MutHandler: Send + Sync + 'static {
118    async fn handle(&mut self, event: bytes::Bytes) -> HandlerResult;
119}
120
121#[async_trait]
122impl<F, Fut, E> Handler for F
123where
124    F: Fn(bytes::Bytes) -> Fut + Send + Sync + 'static,
125    Fut: Future<Output = Result<(), E>> + Send,
126    E: Into<anyhow::Error>,
127{
128    async fn handle(&self, event: bytes::Bytes) -> HandlerResult {
129        (self)(event).await.map_err(|e| e.into())?;
130        Ok(())
131    }
132}
133
134/// Manage lifecycle of a handler registered in a server
135pub(crate) struct HandlerRegistration {
136    inner: Box<dyn Handler>,
137    notify: broadcast::Receiver<()>,
138    events: broadcast::Receiver<bytes::Bytes>,
139}
140
141impl HandlerRegistration {
142    pub(crate) fn new<H>(
143        handler: H,
144        notify: broadcast::Receiver<()>,
145        events: broadcast::Receiver<bytes::Bytes>,
146    ) -> Self
147    where
148        H: Handler,
149    {
150        Self {
151            inner: Box::new(handler),
152            notify,
153            events,
154        }
155    }
156
157    pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
158        loop {
159            tokio::select! {
160                received = self.events.recv() => {
161                    match received {
162                        Ok(event) => {
163                            if let Err(e) = self.inner.handle(event).await {
164                                return Err(GameStateIntegrationError::Handler{source: e});
165                            };
166                        },
167                        Err(_) => {break;}
168                    }
169                }
170                _ = self.notify.recv() => {
171                    break;
172                }
173            }
174        }
175
176        Ok(())
177    }
178}
179
180/// Manage lifecycle of a handler registered in a server
181pub(crate) struct MutHandlerRegistration {
182    inner: Box<dyn MutHandler>,
183    notify: broadcast::Receiver<()>,
184    events: broadcast::Receiver<bytes::Bytes>,
185}
186
187impl MutHandlerRegistration {
188    pub(crate) fn new<H>(
189        handler: H,
190        notify: broadcast::Receiver<()>,
191        events: broadcast::Receiver<bytes::Bytes>,
192    ) -> Self
193    where
194        H: MutHandler,
195    {
196        Self {
197            inner: Box::new(handler),
198            notify,
199            events,
200        }
201    }
202
203    pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
204        loop {
205            tokio::select! {
206                received = self.events.recv() => {
207                    match received {
208                        Ok(event) => {
209                            if let Err(e) = self.inner.handle(event).await {
210                                return Err(GameStateIntegrationError::Handler{source: e});
211                            };
212                        },
213                        Err(_) => {break;}
214                    }
215                }
216                _ = self.notify.recv() => {
217                    break;
218                }
219            }
220        }
221
222        Ok(())
223    }
224}
225
226/// Manage lifecycle of a server's listening task
227pub(crate) struct Listener {
228    uri: String,
229    notify: broadcast::Receiver<()>,
230    send_events: broadcast::Sender<bytes::Bytes>,
231}
232
233impl Listener {
234    pub(crate) fn new(
235        uri: &str,
236        notify: broadcast::Receiver<()>,
237        send_events: broadcast::Sender<bytes::Bytes>,
238    ) -> Self {
239        Self {
240            uri: uri.to_owned(),
241            notify,
242            send_events,
243        }
244    }
245
246    pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
247        let listener = TcpListener::bind(&self.uri).await?;
248        log::info!("Listening on: {:?}", listener.local_addr());
249
250        loop {
251            tokio::select! {
252                accepted = listener.accept() => {
253                    let (socket, _) = match accepted {
254                        Ok(val) => val,
255                        Err(e) => {
256                            return Err(GameStateIntegrationError::SocketRead(e));
257                        }
258                    };
259
260                    if self.send_events.receiver_count() == 0 {
261                        // terminate if no handlers available
262                        return Err(GameStateIntegrationError::NoHandlersAvailable);
263                    }
264
265                    let sender = self.send_events.clone();
266
267                    tokio::spawn(async move {
268                        match process(socket).await {
269                            Err(e) => {
270                                log::error!("{}", e);
271                                Err(e)
272                            }
273                            Ok(buf) => match sender.send(buf) {
274                                Ok(_) => Ok(()),
275                                Err(_) => {
276                                    // send can only fail if there are no active receivers
277                                    // meaning no where registered or the server is shutting down.
278                                    Err(GameStateIntegrationError::NoHandlersAvailable)
279                                }
280                            },
281                        }
282                    });
283
284                }
285                _ = self.notify.recv() => {
286                    break;
287                }
288            }
289        }
290
291        Ok(())
292    }
293}
294
295#[derive(Debug)]
296pub struct ServerError {
297    listener_error: Option<GameStateIntegrationError>,
298    handler_errors: Option<Vec<GameStateIntegrationError>>,
299}
300
301impl Display for ServerError {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        write!(f, "There were one or more errors while running the server")?;
304
305        if let Some(e) = self.listener_error.as_ref() {
306            writeln!(f)?;
307            write!(f, "- {:#}", e)?;
308        }
309
310        if let Some(errors) = self.handler_errors.as_ref() {
311            for e in errors {
312                writeln!(f)?;
313                write!(f, "- {:#}", e)?;
314            }
315        }
316
317        Ok(())
318    }
319}
320
321impl StdError for ServerError {}
322
323/// A [`Server`] that handles game state integration requests.
324///
325/// The [`Server`] spawns a task per registered handler to handle events incoming from the game state integration.
326/// On server shutdown, any pending tasks are canceled. A separate listener task is spawned to actually listen
327/// for game state integration requests on the configured URI, process them to extract the payload, and broadcast
328/// each event to all registered handlers.
329pub struct Server {
330    listener: Option<task::JoinHandle<Result<(), GameStateIntegrationError>>>,
331    handlers: Vec<task::JoinHandle<Result<(), GameStateIntegrationError>>>,
332    notify_shutdown: broadcast::Sender<()>,
333    is_shutdown: bool,
334}
335
336impl Server {
337    pub fn new(
338        listener: task::JoinHandle<Result<(), GameStateIntegrationError>>,
339        handlers: impl IntoIterator<Item = task::JoinHandle<Result<(), GameStateIntegrationError>>>,
340        notify_shutdown: broadcast::Sender<()>,
341    ) -> Self {
342        Self {
343            listener: Some(listener),
344            handlers: handlers.into_iter().collect(),
345            notify_shutdown,
346            is_shutdown: false,
347        }
348    }
349
350    pub async fn run_forever(&self) {
351        let _ = self.notify_shutdown.subscribe().recv().await;
352    }
353
354    /// Shutdown the server.
355    pub async fn shutdown(&mut self) -> Result<(), ServerError> {
356        let _ = self.notify_shutdown.send(());
357
358        let listener_result = if let Some(listener) = self.listener.take() {
359            match listener.await {
360                Ok(r) => r,
361                Err(e) => Err(GameStateIntegrationError::Unknown(e)),
362            }
363        } else {
364            Ok(())
365        };
366
367        let mut handler_errors: Vec<GameStateIntegrationError> = Vec::new();
368        let mut futures: stream::FuturesUnordered<_> = self.handlers.drain(..).collect();
369        while let Some(result) = futures.next().await {
370            match result {
371                Ok(Err(e)) => handler_errors.push(e),
372                Err(e) => handler_errors.push(GameStateIntegrationError::from(e)),
373                Ok(Ok(())) => {}
374            }
375        }
376
377        self.is_shutdown = true;
378
379        match (listener_result, handler_errors.len()) {
380            (Ok(()), 0) => Ok(()),
381            (Err(e), 0) => Err(ServerError {
382                listener_error: Some(e),
383                handler_errors: None,
384            }),
385            (Ok(()), _) => Err(ServerError {
386                listener_error: None,
387                handler_errors: Some(handler_errors),
388            }),
389            (Err(e), _) => Err(ServerError {
390                listener_error: Some(e),
391                handler_errors: Some(handler_errors),
392            }),
393        }
394    }
395
396    pub fn is_shutdown(&self) -> bool {
397        self.is_shutdown
398    }
399}
400
401pub struct ServerBuilder {
402    uri: String,
403    handlers: Vec<HandlerRegistration>,
404    mut_handlers: Vec<MutHandlerRegistration>,
405    notify_shutdown: broadcast::Sender<()>,
406    send_events: broadcast::Sender<bytes::Bytes>,
407    is_shutdown: bool,
408}
409
410impl ServerBuilder {
411    /// Create a new Server with given URI.
412    ///
413    /// The provided URI must match the one used when configuring the game state integration.
414    pub fn new(uri: &str) -> Self {
415        let (notify_shutdown, _) = broadcast::channel(1);
416        let (send_events, _) = broadcast::channel(16);
417
418        Self {
419            uri: uri.to_owned(),
420            notify_shutdown,
421            send_events,
422            is_shutdown: false,
423            handlers: Vec::new(),
424            mut_handlers: Vec::new(),
425        }
426    }
427
428    /// Register a new [`Handler`] on this Server.
429    ///
430    /// Incoming events from game state integration will be broadcast to all registered handlers.
431    pub fn register<H>(mut self, handler: H) -> Self
432    where
433        H: Handler,
434    {
435        let registration = HandlerRegistration::new(
436            handler,
437            self.notify_shutdown.subscribe(),
438            self.send_events.subscribe(),
439        );
440        self.handlers.push(registration);
441
442        self
443    }
444
445    /// Register a new [`MutHandler`] on this Server.
446    ///
447    /// Incoming events from game state integration will be broadcast to all registered handlers.
448    pub fn register_mut<H>(mut self, handler: H) -> Self
449    where
450        H: MutHandler,
451    {
452        let registration = MutHandlerRegistration::new(
453            handler,
454            self.notify_shutdown.subscribe(),
455            self.send_events.subscribe(),
456        );
457        self.mut_handlers.push(registration);
458
459        self
460    }
461
462    /// Start listening to requests and return a handle to the associated [`Listener`] task.
463    pub fn start(self) -> Result<Server, GameStateIntegrationError> {
464        if self.is_shutdown {
465            return Err(GameStateIntegrationError::ServerShutdown);
466        }
467
468        let listener = Listener::new(
469            &self.uri,
470            self.notify_shutdown.subscribe(),
471            self.send_events,
472        );
473
474        let iter = self
475            .handlers
476            .into_iter()
477            .map(|h| tokio::spawn(async move { h.run().await }))
478            .chain(
479                self.mut_handlers
480                    .into_iter()
481                    .map(|h| tokio::spawn(async move { h.run().await })),
482            );
483
484        Ok(Server::new(
485            tokio::spawn(async move { listener.run().await }),
486            iter,
487            self.notify_shutdown,
488        ))
489    }
490}
491
492/// Process a game state integration request.
493///
494/// This function parses the request and reads the entire payload, returning it as a
495/// [`bytes::Bytes`].
496pub async fn process(mut socket: TcpStream) -> Result<bytes::Bytes, GameStateIntegrationError> {
497    if let Err(e) = socket.readable().await {
498        log::error!("socket is not readable");
499        return Err(GameStateIntegrationError::from(e));
500    };
501
502    let mut buf = bytes::BytesMut::with_capacity(INITIAL_REQUEST_BUFFER_CAPACITY_BYTES);
503    let request_length: usize;
504    let content_length: usize;
505
506    loop {
507        match socket.read_buf(&mut buf).await {
508            Ok(n) => n,
509            Err(e) => {
510                log::error!("failed to read request from socket: {}", e);
511                return Err(GameStateIntegrationError::from(e));
512            }
513        };
514
515        let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
516        let mut r = httparse::Request::new(&mut headers);
517
518        request_length = match r.parse(&buf) {
519            Ok(httparse::Status::Complete(size)) => size,
520            Ok(httparse::Status::Partial) => {
521                log::debug!("partial request parsed, need to read more");
522                continue;
523            }
524            Err(e) => {
525                log::error!("failed to parse request: {}", e);
526                return Err(GameStateIntegrationError::from(e));
527            }
528        };
529        log::debug!("headers: {:?}", headers);
530        content_length = get_content_length_from_headers(&headers)?;
531        break;
532    }
533
534    // Call 'reserve' for additional capacity if necessary
535    let remaining = (request_length + content_length).saturating_sub(buf.len());
536    buf.reserve(remaining);
537
538    while buf.len() < request_length + content_length {
539        match socket.read_buf(&mut buf).await {
540            Ok(0) => {
541                log::error!("eof before receiving full body");
542                return Err(GameStateIntegrationError::UnexpectedEOF);
543            }
544            Ok(_) => {}
545            Err(e) => {
546                log::error!("failed to read body from socket: {}", e);
547                return Err(GameStateIntegrationError::from(e));
548            }
549        };
550    }
551
552    if let Err(e) = socket.write_all(OK.as_bytes()).await {
553        log::error!("failed to write to socket: {}", e);
554        return Err(GameStateIntegrationError::from(e));
555    };
556
557    Ok(buf.split_off(request_length).freeze())
558}
559
560/// Extract Content-Length value from a list of HTTP headers.
561pub fn get_content_length_from_headers(
562    headers: &[httparse::Header],
563) -> Result<usize, GameStateIntegrationError> {
564    match headers
565        .iter()
566        .filter(|h| h.name == "Content-Length")
567        .map(|h| h.value)
568        .next()
569    {
570        Some(value) => {
571            let str_length = match std::str::from_utf8(value) {
572                Ok(s) => s,
573                Err(e) => {
574                    return Err(GameStateIntegrationError::InvalidContentLength(format!(
575                        "failed to parse bytes as str: {}",
576                        e
577                    )));
578                }
579            };
580            match str_length.parse::<usize>() {
581                Ok(n) => Ok(n),
582                Err(e) => Err(GameStateIntegrationError::InvalidContentLength(format!(
583                    "failed to parse str into usize: {}",
584                    e
585                ))),
586            }
587        }
588        None => Err(GameStateIntegrationError::MissingContentLengthHeader),
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use std::time;
596    use tokio::sync::mpsc;
597    use tokio::time::{sleep, timeout};
598
599    const TEST_URI: &'static str = "127.0.0.1:10080";
600
601    #[test]
602    fn test_get_content_length_from_headers() {
603        let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
604        let mut r = httparse::Request::new(&mut headers);
605        let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 54943\r\n\r\n";
606        r.parse(request_bytes)
607            .expect("parsing the request should never fail");
608
609        let expected = 54943 as usize;
610        let content_length =
611            get_content_length_from_headers(&r.headers).expect("failed to get Content-Length");
612
613        assert_eq!(content_length, expected);
614    }
615
616    #[test]
617    fn test_get_content_length_from_headers_not_found() {
618        let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
619        let mut r = httparse::Request::new(&mut headers);
620        let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\n\r\n";
621        r.parse(request_bytes)
622            .expect("parsing the request should never fail");
623
624        let content_length = get_content_length_from_headers(&r.headers);
625
626        assert!(matches!(
627            content_length,
628            Err(GameStateIntegrationError::MissingContentLengthHeader)
629        ));
630    }
631
632    #[test]
633    fn test_get_content_length_from_headers_not_a_number() {
634        let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
635        let mut r = httparse::Request::new(&mut headers);
636        let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: asdasd\r\n\r\n";
637        r.parse(request_bytes)
638            .expect("parsing the request should never fail");
639
640        let content_length = get_content_length_from_headers(&r.headers);
641
642        assert!(matches!(
643            content_length,
644            Err(GameStateIntegrationError::InvalidContentLength(_))
645        ));
646    }
647
648    #[tokio::test]
649    async fn test_process() {
650        let listener = TcpListener::bind(TEST_URI)
651            .await
652            .expect("failed to bind to address");
653        let local_addr = listener.local_addr().unwrap();
654        let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
655        let expected = b"{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
656
657        tokio::spawn(async move {
658            if let Ok((mut stream, _)) = listener.accept().await {
659                let _ = stream.write_all(sample_request).await;
660                let _ = stream.shutdown().await;
661            }
662        });
663
664        let stream = TcpStream::connect(local_addr)
665            .await
666            .expect("failed to connect to address");
667
668        let result = process(stream).await.expect("processing failed");
669        assert_eq!(result.len(), expected.len());
670        assert_eq!(result.as_ref(), expected);
671    }
672
673    #[tokio::test]
674    async fn test_server_handles_events() {
675        let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:20080\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
676        let expected = bytes::Bytes::from_static(b"{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}");
677
678        let (tx1, mut rx1) = mpsc::channel(2);
679        let (tx2, mut rx2) = mpsc::channel(2);
680
681        let mut server = ServerBuilder::new("127.0.0.1:30080")
682            .register(move |event| {
683                let tx1 = tx1.clone();
684                async move {
685                    let _ = &tx1.send(event).await?;
686                    Ok::<(), mpsc::error::SendError<bytes::Bytes>>(())
687                }
688            })
689            .register(move |event| {
690                let tx2 = tx2.clone();
691                async move {
692                    let _ = &tx2.send(event).await?;
693                    Ok::<(), mpsc::error::SendError<bytes::Bytes>>(())
694                }
695            })
696            .start()
697            .unwrap();
698
699        // Advance the event loop for listener to start
700        sleep(time::Duration::from_millis(10)).await;
701
702        tokio::spawn(async move {
703            for _ in 0..2 {
704                let mut stream = TcpStream::connect("127.0.0.1:30080").await.unwrap();
705                let _ = stream.write_all(sample_request).await;
706                let _ = stream.shutdown().await;
707            }
708        });
709
710        // Advance the event loop for events to be processed
711        sleep(time::Duration::from_millis(10)).await;
712
713        if let Err(_) = timeout(time::Duration::from_secs(5), server.shutdown()).await {
714            panic!("did not shut down in 5 seconds");
715        }
716
717        let mut v1 = Vec::new();
718        let mut v2 = Vec::new();
719
720        async fn capture(rx: &mut mpsc::Receiver<bytes::Bytes>, v: &mut Vec<bytes::Bytes>) {
721            let val = rx.recv().await;
722            v.push(val.unwrap());
723        }
724
725        if let Err(_) = timeout(time::Duration::from_secs(5), async {
726            tokio::join!(capture(&mut rx1, &mut v1), capture(&mut rx2, &mut v2));
727            tokio::join!(capture(&mut rx1, &mut v1), capture(&mut rx2, &mut v2));
728        })
729        .await
730        {
731            println!("did not receive values within 5 seconds");
732        }
733
734        assert_eq!(v1.len(), 2);
735        assert_eq!(v2.len(), 2);
736        assert_eq!(v1[0], &expected);
737        assert_eq!(v1[1], &expected);
738        assert_eq!(v2[0], &expected);
739        assert_eq!(v2[1], &expected);
740        assert!(server.is_shutdown());
741    }
742
743    #[tokio::test]
744    async fn test_listener_shutsdown_when_all_handlers_fail() {
745        let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:20080\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
746
747        let mut server = ServerBuilder::new("127.0.0.1:40080")
748            .register(move |_| async move { Err(anyhow::anyhow!("an error")) })
749            .register(move |_| async move { Err(anyhow::anyhow!("another error")) })
750            .start()
751            .unwrap();
752
753        // Advance the event loop for listener to start
754        sleep(time::Duration::from_millis(10)).await;
755
756        tokio::spawn(async move {
757            for _ in 0..2 {
758                let mut stream = TcpStream::connect("127.0.0.1:40080").await.unwrap();
759                let _ = stream.write_all(sample_request).await;
760                let _ = stream.shutdown().await;
761            }
762        });
763
764        // Process events, shut down handlers
765        sleep(time::Duration::from_millis(10)).await;
766
767        // One more event triggers listener shutdown
768        tokio::spawn(async move {
769            let mut stream = TcpStream::connect("127.0.0.1:40080").await.unwrap();
770            let _ = stream.write_all(sample_request).await;
771            let _ = stream.shutdown().await;
772        });
773
774        // Listener shuts down
775        sleep(time::Duration::from_millis(10)).await;
776
777        let _expected_handler_errors: Vec<GameStateIntegrationError> = vec![
778            GameStateIntegrationError::Handler {
779                source: anyhow::anyhow!("an error"),
780            },
781            GameStateIntegrationError::Handler {
782                source: anyhow::anyhow!("another error"),
783            },
784        ];
785        match timeout(time::Duration::from_secs(5), server.shutdown()).await {
786            Err(_) => {
787                panic!("did not finish in 5 seconds");
788            }
789            Ok(result) => {
790                assert!(matches!(
791                    result,
792                    Err(ServerError {
793                        listener_error: Some(GameStateIntegrationError::NoHandlersAvailable),
794                        handler_errors: Some(_expected_handler_errors)
795                    })
796                ));
797            }
798        }
799    }
800}