dota/
lib.rs

1//! Game State Integration with Dota 2.
2//!
3//! Provides a [`Server`] that listens for JSON events sent by Dota 2. Enabling game state
4//! integration requires:
5//! 1. Creating a `.cfg` [configuration file] in the Dota 2 game configuration directory.
6//! 2. Running Dota 2 with the -gamestateintegration [launch option].
7//!
8//! The configuration file can have any name name, but must be prefixed by `gamestate_integration_`.
9//! For example, `gamestate_integration_test.cfg` would be located:
10//! * In Linux: `~/.steam/steam/steamapps/common/dota 2 beta/game/dota/cfg/gamestate_integration_test.cfg`
11//! * In Windows: `D:\Steam\steamapps\common\dota 2 beta\dota\cfg\gamestate_integration_test.cfg`
12//!
13//! Here's a sample configuration file:
14//!
15//! "dota2-gsi Configuration"
16//!{
17//!    "uri"               "http://127.0.0.1:53000/"
18//!    "timeout"           "5.0"
19//!    "buffer"            "0.1"
20//!    "throttle"          "0.1"
21//!    "heartbeat"         "30.0"
22//!    "data"
23//!    {
24//!        "buildings"     "1"
25//!        "provider"      "1"
26//!        "map"           "1"
27//!        "player"        "1"
28//!        "hero"          "1"
29//!        "abilities"     "1"
30//!        "items"         "1"
31//!        "draft"         "1"
32//!        "wearables"     "1"
33//!    }
34//!    "auth"
35//!    {
36//!        "token"         "abcdefghijklmopqrstuvxyz123456789"
37//!    }
38//!}
39//!
40//! Note the URI used in the configuration file must be the same URI used with a [`ServerBuilder`].
41//!
42//! [^configuration file]: Details on configuration file: https://developer.valvesoftware.com/wiki/Counter-Strike:_Global_Offensive_Game_State_Integration
43//! [^launch option]: Available launch options: https://help.steampowered.com/en/faqs/view/7d01-d2dd-d75e-2955
44use std::error::Error as StdError;
45use std::fmt::{self, Display};
46use std::future::Future;
47use std::io;
48
49use async_trait::async_trait;
50use futures::{StreamExt, stream};
51use tokio::io::{AsyncReadExt, AsyncWriteExt};
52use tokio::net::{TcpListener, TcpStream};
53use tokio::sync::broadcast;
54use tokio::task;
55
56pub mod components;
57pub mod handlers;
58
59/// The payload sent on every game state integration request is usually between 50-60kb.
60/// We initialize a buffer to read the request with this initial capacity.
61/// The code then looks at the Content-Length header to reserve the required capacity.
62const INITIAL_REQUEST_BUFFER_CAPACITY_BYTES: usize = 1024;
63
64/// The POST game state integration request includes this number of headers.
65/// We parse them to find the Content-Length.
66const EXPECTED_NUMBER_OF_HEADERS: usize = 7;
67
68/// The response expected for every game state integration request.
69/// Failure to deliver this response would cause the request to be retried infinitely.
70const OK: &str = "HTTP/1.1 200 OK\ncontent-type: text/html\n";
71
72#[derive(thiserror::Error, Debug)]
73pub enum GameStateIntegrationError {
74    #[error("incomplete headers from game state integration request")]
75    IncompleteHeaders,
76    #[error("failed to read from socket")]
77    SocketRead(#[from] io::Error),
78    #[error("no handlers available to process request, is the server shutting down?")]
79    NoHandlersAvailable,
80    #[error("invalid content length header: {0}")]
81    InvalidContentLength(String),
82    #[error("missing Content-Length header in request")]
83    MissingContentLengthHeader,
84    #[error("invalid request received")]
85    InvalidRequest(#[from] httparse::Error),
86    #[error("server has already shutdown")]
87    ServerShutdown,
88    #[error("handler failed when handling event")]
89    Handler {
90        #[source]
91        source: anyhow::Error,
92    },
93    #[error("an error occurred while running the server")]
94    Unknown(#[from] task::JoinError),
95}
96
97pub type HandlerResult = Result<(), anyhow::Error>;
98
99/// Trait for any async function or struct that can be used to handle game state integration events.
100///
101/// This trait is automatically implemented for async functions and closures.
102#[async_trait]
103pub trait Handler: Send + Sync + 'static {
104    async fn handle(&self, event: bytes::Bytes) -> HandlerResult;
105}
106
107#[async_trait]
108impl<F, Fut, E> Handler for F
109where
110    F: Fn(bytes::Bytes) -> Fut + Send + Sync + 'static,
111    Fut: Future<Output = Result<(), E>> + Send,
112    E: Into<anyhow::Error>,
113{
114    async fn handle(&self, event: bytes::Bytes) -> HandlerResult {
115        (self)(event).await.map_err(|e| e.into())?;
116        Ok(())
117    }
118}
119
120/// Manage lifecycle of a handler registered in a server
121pub(crate) struct HandlerRegistration {
122    inner: Box<dyn Handler>,
123    is_shutdown: bool,
124    notify: broadcast::Receiver<()>,
125    events: broadcast::Receiver<bytes::Bytes>,
126}
127
128impl HandlerRegistration {
129    pub(crate) fn new<H>(
130        handler: H,
131        notify: broadcast::Receiver<()>,
132        events: broadcast::Receiver<bytes::Bytes>,
133    ) -> Self
134    where
135        H: Handler,
136    {
137        Self {
138            inner: Box::new(handler),
139            is_shutdown: false,
140            notify,
141            events,
142        }
143    }
144
145    pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
146        loop {
147            tokio::select! {
148                received = self.events.recv() => {
149                    match received {
150                        Ok(event) => {
151                            if let Err(e) = self.inner.handle(event).await {
152                                return Err(GameStateIntegrationError::Handler{source: e});
153                            };
154                        },
155                        Err(_) => {break;}
156                    }
157                }
158                _ = self.notify.recv() => {
159                    break;
160                }
161            }
162        }
163
164        self.is_shutdown = true;
165
166        Ok(())
167    }
168
169    #[allow(dead_code)]
170    pub(crate) fn is_shutdown(&self) -> bool {
171        self.is_shutdown
172    }
173}
174
175/// Manage lifecycle of a server's listening task
176pub(crate) struct Listener {
177    uri: String,
178    is_shutdown: bool,
179    notify: broadcast::Receiver<()>,
180    send_events: broadcast::Sender<bytes::Bytes>,
181}
182
183impl Listener {
184    pub(crate) fn new(
185        uri: &str,
186        notify: broadcast::Receiver<()>,
187        send_events: broadcast::Sender<bytes::Bytes>,
188    ) -> Self {
189        Self {
190            uri: uri.to_owned(),
191            is_shutdown: false,
192            notify,
193            send_events,
194        }
195    }
196
197    pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
198        let listener = TcpListener::bind(&self.uri).await?;
199        log::info!("Listening on: {:?}", listener.local_addr());
200
201        loop {
202            tokio::select! {
203                accepted = listener.accept() => {
204                    let (socket, _) = match accepted {
205                        Ok(val) => val,
206                        Err(e) => {
207                            self.is_shutdown = true;
208                            return Err(GameStateIntegrationError::SocketRead(e));
209                        }
210                    };
211
212                    if self.send_events.receiver_count() == 0 {
213                        // terminate if no handlers available
214                        return Err(GameStateIntegrationError::NoHandlersAvailable);
215                    }
216
217                    let sender = self.send_events.clone();
218
219                    tokio::spawn(async move {
220                        match process(socket).await {
221                            Err(e) => {
222                                log::error!("{}", e);
223                                Err(e)
224                            }
225                            Ok(buf) => match sender.send(buf) {
226                                Ok(_) => Ok(()),
227                                Err(_) => {
228                                    // send can only fail if there are no active receivers
229                                    // meaning no where registered or the server is shutting down.
230                                    Err(GameStateIntegrationError::NoHandlersAvailable)
231                                }
232                            },
233                        }
234                    });
235
236                }
237                _ = self.notify.recv() => {
238                    break;
239                }
240            }
241        }
242
243        self.is_shutdown = true;
244
245        Ok(())
246    }
247
248    #[allow(dead_code)]
249    pub(crate) fn is_shutdown(&self) -> bool {
250        self.is_shutdown
251    }
252}
253
254#[derive(Debug)]
255pub struct ServerError {
256    listener_error: Option<GameStateIntegrationError>,
257    handler_errors: Option<Vec<GameStateIntegrationError>>,
258}
259
260impl Display for ServerError {
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        write!(f, "There were one or more errors while running the server")?;
263
264        if let Some(e) = self.listener_error.as_ref() {
265            writeln!(f)?;
266            write!(f, "- {:#}", e)?;
267        }
268
269        if let Some(errors) = self.handler_errors.as_ref() {
270            for e in errors {
271                writeln!(f)?;
272                write!(f, "- {:#}", e)?;
273            }
274        }
275
276        Ok(())
277    }
278}
279
280impl StdError for ServerError {}
281
282/// A [`Server`] that handles game state integration requests.
283///
284/// The [`Server`] spawns a task per registered handler to handle events incoming from the game state integration.
285/// On server shutdown, any pending tasks are canceled. A separate listener task is spawned to actually listen
286/// for game state integration requests on the configured URI, process them to extract the payload, and broadcast
287/// each event to all registered handlers.
288pub struct Server {
289    listener: Option<task::JoinHandle<Result<(), GameStateIntegrationError>>>,
290    handlers: Vec<task::JoinHandle<Result<(), GameStateIntegrationError>>>,
291    notify_shutdown: broadcast::Sender<()>,
292    is_shutdown: bool,
293}
294
295impl Server {
296    pub fn new(
297        listener: task::JoinHandle<Result<(), GameStateIntegrationError>>,
298        handlers: impl IntoIterator<Item = task::JoinHandle<Result<(), GameStateIntegrationError>>>,
299        notify_shutdown: broadcast::Sender<()>,
300    ) -> Self {
301        Self {
302            listener: Some(listener),
303            handlers: handlers.into_iter().collect(),
304            notify_shutdown,
305            is_shutdown: false,
306        }
307    }
308
309    pub async fn run_forever(&self) {
310        let _ = self.notify_shutdown.subscribe().recv().await;
311    }
312
313    /// Shutdown the server.
314    pub async fn shutdown(&mut self) -> Result<(), ServerError> {
315        let _ = self.notify_shutdown.send(());
316
317        let listener_result = if let Some(listener) = self.listener.take() {
318            match listener.await {
319                Ok(r) => r,
320                Err(e) => Err(GameStateIntegrationError::Unknown(e)),
321            }
322        } else {
323            Ok(())
324        };
325
326        let mut handler_errors: Vec<GameStateIntegrationError> = Vec::new();
327        let mut futures: stream::FuturesUnordered<_> = self.handlers.drain(..).collect();
328        while let Some(result) = futures.next().await {
329            match result {
330                Ok(Err(e)) => handler_errors.push(e),
331                Err(e) => handler_errors.push(GameStateIntegrationError::from(e)),
332                Ok(Ok(())) => {}
333            }
334        }
335
336        self.is_shutdown = true;
337
338        match (listener_result, handler_errors.len()) {
339            (Ok(()), 0) => Ok(()),
340            (Err(e), 0) => Err(ServerError {
341                listener_error: Some(e),
342                handler_errors: None,
343            }),
344            (Ok(()), _) => Err(ServerError {
345                listener_error: None,
346                handler_errors: Some(handler_errors),
347            }),
348            (Err(e), _) => Err(ServerError {
349                listener_error: Some(e),
350                handler_errors: Some(handler_errors),
351            }),
352        }
353    }
354
355    pub fn is_shutdown(&self) -> bool {
356        self.is_shutdown
357    }
358}
359
360pub struct ServerBuilder {
361    uri: String,
362    handlers: Vec<HandlerRegistration>,
363    notify_shutdown: broadcast::Sender<()>,
364    send_events: broadcast::Sender<bytes::Bytes>,
365    is_shutdown: bool,
366}
367
368impl ServerBuilder {
369    /// Create a new Server with given URI.
370    ///
371    /// The provided URI must match the one used when configuring the game state integration.
372    pub fn new(uri: &str) -> Self {
373        let (notify_shutdown, _) = broadcast::channel(1);
374        let (send_events, _) = broadcast::channel(16);
375
376        Self {
377            uri: uri.to_owned(),
378            notify_shutdown,
379            send_events,
380            is_shutdown: false,
381            handlers: Vec::new(),
382        }
383    }
384
385    /// Register a new handler on this Server.
386    ///
387    /// Incoming events from game state integration will be broadcast to all registered handlers.
388    pub fn register<H>(mut self, handler: H) -> Self
389    where
390        H: Handler,
391    {
392        let registration = HandlerRegistration::new(
393            handler,
394            self.notify_shutdown.subscribe(),
395            self.send_events.subscribe(),
396        );
397        self.handlers.push(registration);
398
399        self
400    }
401
402    /// Start listening to requests and return a handle to the associated [`Listener`] task.
403    pub fn start(self) -> Result<Server, GameStateIntegrationError> {
404        if self.is_shutdown {
405            return Err(GameStateIntegrationError::ServerShutdown);
406        }
407
408        let listener = Listener::new(
409            &self.uri,
410            self.notify_shutdown.subscribe(),
411            self.send_events,
412        );
413
414        Ok(Server::new(
415            tokio::spawn(async move { listener.run().await }),
416            self.handlers
417                .into_iter()
418                .map(|h| tokio::spawn(async move { h.run().await })),
419            self.notify_shutdown,
420        ))
421    }
422}
423
424/// Process a game state integration request.
425///
426/// This function parses the request and reads the entire payload, returning it as a
427/// [`bytes::Bytes`].
428pub async fn process(mut socket: TcpStream) -> Result<bytes::Bytes, GameStateIntegrationError> {
429    if let Err(e) = socket.readable().await {
430        log::error!("socket is not readable");
431        return Err(GameStateIntegrationError::from(e));
432    };
433
434    let mut buf = bytes::BytesMut::with_capacity(INITIAL_REQUEST_BUFFER_CAPACITY_BYTES);
435    let request_length: usize;
436    let content_length: usize;
437
438    loop {
439        match socket.read_buf(&mut buf).await {
440            Ok(n) => n,
441            Err(e) => {
442                log::error!("failed to read request from socket: {}", e);
443                return Err(GameStateIntegrationError::from(e));
444            }
445        };
446
447        let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
448        let mut r = httparse::Request::new(&mut headers);
449
450        request_length = match r.parse(&buf) {
451            Ok(httparse::Status::Complete(size)) => size,
452            Ok(httparse::Status::Partial) => {
453                log::debug!("partial request parsed, need to read more");
454                continue;
455            }
456            Err(e) => {
457                log::error!("failed to parse request: {}", e);
458                return Err(GameStateIntegrationError::from(e));
459            }
460        };
461        log::debug!("headers: {:?}", headers);
462        content_length = get_content_length_from_headers(&headers)?;
463        break;
464    }
465
466    if buf.len() <= request_length + content_length {
467        buf.reserve(request_length + content_length);
468        match socket.read_buf(&mut buf).await {
469            Ok(n) => n,
470            Err(e) => {
471                log::error!("failed to read body from socket: {}", e);
472                return Err(GameStateIntegrationError::from(e));
473            }
474        };
475    }
476
477    if let Err(e) = socket.write_all(OK.as_bytes()).await {
478        log::error!("failed to write to socket: {}", e);
479        return Err(GameStateIntegrationError::from(e));
480    };
481
482    Ok(buf.split_off(request_length).freeze())
483}
484
485/// Extract Content-Length value from a list of HTTP headers.
486pub fn get_content_length_from_headers(
487    headers: &[httparse::Header],
488) -> Result<usize, GameStateIntegrationError> {
489    match headers
490        .iter()
491        .filter(|h| h.name == "Content-Length")
492        .map(|h| h.value)
493        .next()
494    {
495        Some(value) => {
496            let str_length = match std::str::from_utf8(value) {
497                Ok(s) => s,
498                Err(e) => {
499                    return Err(GameStateIntegrationError::InvalidContentLength(format!(
500                        "failed to parse bytes as str: {}",
501                        e
502                    )));
503                }
504            };
505            match str_length.parse::<usize>() {
506                Ok(n) => Ok(n),
507                Err(e) => Err(GameStateIntegrationError::InvalidContentLength(format!(
508                    "failed to parse str into usize: {}",
509                    e
510                ))),
511            }
512        }
513        None => Err(GameStateIntegrationError::MissingContentLengthHeader),
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use std::time;
521    use tokio::sync::mpsc;
522    use tokio::time::{sleep, timeout};
523
524    const TEST_URI: &'static str = "127.0.0.1:10080";
525
526    #[test]
527    fn test_get_content_length_from_headers() {
528        let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
529        let mut r = httparse::Request::new(&mut headers);
530        let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 54943\r\n\r\n";
531        r.parse(request_bytes)
532            .expect("parsing the request should never fail");
533
534        let expected = 54943 as usize;
535        let content_length =
536            get_content_length_from_headers(&r.headers).expect("failed to get Content-Length");
537
538        assert_eq!(content_length, expected);
539    }
540
541    #[test]
542    fn test_get_content_length_from_headers_not_found() {
543        let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
544        let mut r = httparse::Request::new(&mut headers);
545        let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\n\r\n";
546        r.parse(request_bytes)
547            .expect("parsing the request should never fail");
548
549        let content_length = get_content_length_from_headers(&r.headers);
550
551        assert!(matches!(
552            content_length,
553            Err(GameStateIntegrationError::MissingContentLengthHeader)
554        ));
555    }
556
557    #[test]
558    fn test_get_content_length_from_headers_not_a_number() {
559        let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
560        let mut r = httparse::Request::new(&mut headers);
561        let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: asdasd\r\n\r\n";
562        r.parse(request_bytes)
563            .expect("parsing the request should never fail");
564
565        let content_length = get_content_length_from_headers(&r.headers);
566
567        assert!(matches!(
568            content_length,
569            Err(GameStateIntegrationError::InvalidContentLength(_))
570        ));
571    }
572
573    #[tokio::test]
574    async fn test_process() {
575        let listener = TcpListener::bind(TEST_URI)
576            .await
577            .expect("failed to bind to address");
578        let local_addr = listener.local_addr().unwrap();
579        let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
580        let expected = b"{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
581
582        tokio::spawn(async move {
583            if let Ok((mut stream, _)) = listener.accept().await {
584                let _ = stream.write_all(sample_request).await;
585                let _ = stream.shutdown().await;
586            }
587        });
588
589        let stream = TcpStream::connect(local_addr)
590            .await
591            .expect("failed to connect to address");
592
593        let result = process(stream).await.expect("processing failed");
594        assert_eq!(result.len(), expected.len());
595        assert_eq!(result.as_ref(), expected);
596    }
597
598    #[tokio::test]
599    async fn test_server_handles_events() {
600        let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:20080\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
601        let expected = bytes::Bytes::from_static(b"{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}");
602
603        let (tx1, mut rx1) = mpsc::channel(2);
604        let (tx2, mut rx2) = mpsc::channel(2);
605
606        let mut server = ServerBuilder::new("127.0.0.1:30080")
607            .register(move |event| {
608                let tx1 = tx1.clone();
609                async move {
610                    let _ = &tx1.send(event).await?;
611                    Ok::<(), mpsc::error::SendError<bytes::Bytes>>(())
612                }
613            })
614            .register(move |event| {
615                let tx2 = tx2.clone();
616                async move {
617                    let _ = &tx2.send(event).await?;
618                    Ok::<(), mpsc::error::SendError<bytes::Bytes>>(())
619                }
620            })
621            .start()
622            .unwrap();
623
624        // Advance the event loop for listener to start
625        sleep(time::Duration::from_millis(10)).await;
626
627        tokio::spawn(async move {
628            for _ in 0..2 {
629                let mut stream = TcpStream::connect("127.0.0.1:30080").await.unwrap();
630                let _ = stream.write_all(sample_request).await;
631                let _ = stream.shutdown().await;
632            }
633        });
634
635        // Advance the event loop for events to be processed
636        sleep(time::Duration::from_millis(10)).await;
637
638        if let Err(_) = timeout(time::Duration::from_secs(5), server.shutdown()).await {
639            panic!("did not shut down in 5 seconds");
640        }
641
642        let mut v1 = Vec::new();
643        let mut v2 = Vec::new();
644
645        async fn capture(rx: &mut mpsc::Receiver<bytes::Bytes>, v: &mut Vec<bytes::Bytes>) {
646            let val = rx.recv().await;
647            v.push(val.unwrap());
648        }
649
650        if let Err(_) = timeout(time::Duration::from_secs(5), async {
651            tokio::join!(capture(&mut rx1, &mut v1), capture(&mut rx2, &mut v2));
652            tokio::join!(capture(&mut rx1, &mut v1), capture(&mut rx2, &mut v2));
653        })
654        .await
655        {
656            println!("did not receive values within 5 seconds");
657        }
658
659        assert_eq!(v1.len(), 2);
660        assert_eq!(v2.len(), 2);
661        assert_eq!(v1[0], &expected);
662        assert_eq!(v1[1], &expected);
663        assert_eq!(v2[0], &expected);
664        assert_eq!(v2[1], &expected);
665        assert!(server.is_shutdown());
666    }
667
668    #[tokio::test]
669    async fn test_listener_shutsdown_when_all_handlers_fail() {
670        let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:20080\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
671
672        let mut server = ServerBuilder::new("127.0.0.1:40080")
673            .register(move |_| async move { Err(anyhow::anyhow!("an error")) })
674            .register(move |_| async move { Err(anyhow::anyhow!("another error")) })
675            .start()
676            .unwrap();
677
678        // Advance the event loop for listener to start
679        sleep(time::Duration::from_millis(10)).await;
680
681        tokio::spawn(async move {
682            for _ in 0..2 {
683                let mut stream = TcpStream::connect("127.0.0.1:40080").await.unwrap();
684                let _ = stream.write_all(sample_request).await;
685                let _ = stream.shutdown().await;
686            }
687        });
688
689        // Process events, shut down handlers
690        sleep(time::Duration::from_millis(10)).await;
691
692        // One more event triggers listener shutdown
693        tokio::spawn(async move {
694            let mut stream = TcpStream::connect("127.0.0.1:40080").await.unwrap();
695            let _ = stream.write_all(sample_request).await;
696            let _ = stream.shutdown().await;
697        });
698
699        // Listener shuts down
700        sleep(time::Duration::from_millis(10)).await;
701
702        let _expected_handler_errors: Vec<GameStateIntegrationError> = vec![
703            GameStateIntegrationError::Handler {
704                source: anyhow::anyhow!("an error"),
705            },
706            GameStateIntegrationError::Handler {
707                source: anyhow::anyhow!("another error"),
708            },
709        ];
710        match timeout(time::Duration::from_secs(5), server.shutdown()).await {
711            Err(_) => {
712                panic!("did not finish in 5 seconds");
713            }
714            Ok(result) => {
715                assert!(matches!(
716                    result,
717                    Err(ServerError {
718                        listener_error: Some(GameStateIntegrationError::NoHandlersAvailable),
719                        handler_errors: Some(_expected_handler_errors)
720                    })
721                ));
722            }
723        }
724    }
725}