1use std::error::Error as StdError;
45use std::fmt::{self, Display};
46use std::future::Future;
47use std::io;
48
49use async_trait::async_trait;
50use futures::{StreamExt, stream};
51use tokio::io::{AsyncReadExt, AsyncWriteExt};
52use tokio::net::{TcpListener, TcpStream};
53use tokio::sync::broadcast;
54use tokio::task;
55
56#[cfg(feature = "models")]
57pub mod components;
58#[cfg(feature = "diff")]
59pub mod diff;
60#[cfg(feature = "diff")]
61pub mod event;
62#[cfg(feature = "handlers")]
63pub mod handlers;
64
65const INITIAL_REQUEST_BUFFER_CAPACITY_BYTES: usize = 1024;
69
70const EXPECTED_NUMBER_OF_HEADERS: usize = 7;
73
74const OK: &str = "HTTP/1.1 200 OK\ncontent-type: text/html\n";
77
78#[derive(thiserror::Error, Debug)]
79pub enum GameStateIntegrationError {
80 #[error("incomplete headers from game state integration request")]
81 IncompleteHeaders,
82 #[error("failed to read from socket")]
83 SocketRead(#[from] io::Error),
84 #[error("no handlers available to process request, is the server shutting down?")]
85 NoHandlersAvailable,
86 #[error("invalid content length header: {0}")]
87 InvalidContentLength(String),
88 #[error("missing Content-Length header in request")]
89 MissingContentLengthHeader,
90 #[error("invalid request received")]
91 InvalidRequest(#[from] httparse::Error),
92 #[error("unexpected EOF")]
93 UnexpectedEOF,
94 #[error("server has already shutdown")]
95 ServerShutdown,
96 #[error("handler failed when handling event")]
97 Handler {
98 #[source]
99 source: anyhow::Error,
100 },
101 #[error("an error occurred while running the server")]
102 Unknown(#[from] task::JoinError),
103}
104
105pub type HandlerResult = Result<(), anyhow::Error>;
106
107#[async_trait]
111pub trait Handler: Send + Sync + 'static {
112 async fn handle(&self, event: bytes::Bytes) -> HandlerResult;
113}
114
115#[async_trait]
117pub trait MutHandler: Send + Sync + 'static {
118 async fn handle(&mut self, event: bytes::Bytes) -> HandlerResult;
119}
120
121#[async_trait]
122impl<F, Fut, E> Handler for F
123where
124 F: Fn(bytes::Bytes) -> Fut + Send + Sync + 'static,
125 Fut: Future<Output = Result<(), E>> + Send,
126 E: Into<anyhow::Error>,
127{
128 async fn handle(&self, event: bytes::Bytes) -> HandlerResult {
129 (self)(event).await.map_err(|e| e.into())?;
130 Ok(())
131 }
132}
133
134pub(crate) struct HandlerRegistration {
136 inner: Box<dyn Handler>,
137 notify: broadcast::Receiver<()>,
138 events: broadcast::Receiver<bytes::Bytes>,
139}
140
141impl HandlerRegistration {
142 pub(crate) fn new<H>(
143 handler: H,
144 notify: broadcast::Receiver<()>,
145 events: broadcast::Receiver<bytes::Bytes>,
146 ) -> Self
147 where
148 H: Handler,
149 {
150 Self {
151 inner: Box::new(handler),
152 notify,
153 events,
154 }
155 }
156
157 pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
158 loop {
159 tokio::select! {
160 received = self.events.recv() => {
161 match received {
162 Ok(event) => {
163 if let Err(e) = self.inner.handle(event).await {
164 return Err(GameStateIntegrationError::Handler{source: e});
165 };
166 },
167 Err(_) => {break;}
168 }
169 }
170 _ = self.notify.recv() => {
171 break;
172 }
173 }
174 }
175
176 Ok(())
177 }
178}
179
180pub(crate) struct MutHandlerRegistration {
182 inner: Box<dyn MutHandler>,
183 notify: broadcast::Receiver<()>,
184 events: broadcast::Receiver<bytes::Bytes>,
185}
186
187impl MutHandlerRegistration {
188 pub(crate) fn new<H>(
189 handler: H,
190 notify: broadcast::Receiver<()>,
191 events: broadcast::Receiver<bytes::Bytes>,
192 ) -> Self
193 where
194 H: MutHandler,
195 {
196 Self {
197 inner: Box::new(handler),
198 notify,
199 events,
200 }
201 }
202
203 pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
204 loop {
205 tokio::select! {
206 received = self.events.recv() => {
207 match received {
208 Ok(event) => {
209 if let Err(e) = self.inner.handle(event).await {
210 return Err(GameStateIntegrationError::Handler{source: e});
211 };
212 },
213 Err(_) => {break;}
214 }
215 }
216 _ = self.notify.recv() => {
217 break;
218 }
219 }
220 }
221
222 Ok(())
223 }
224}
225
226pub(crate) struct Listener {
228 uri: String,
229 notify: broadcast::Receiver<()>,
230 send_events: broadcast::Sender<bytes::Bytes>,
231}
232
233impl Listener {
234 pub(crate) fn new(
235 uri: &str,
236 notify: broadcast::Receiver<()>,
237 send_events: broadcast::Sender<bytes::Bytes>,
238 ) -> Self {
239 Self {
240 uri: uri.to_owned(),
241 notify,
242 send_events,
243 }
244 }
245
246 pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
247 let listener = TcpListener::bind(&self.uri).await?;
248 log::info!("Listening on: {:?}", listener.local_addr());
249
250 loop {
251 tokio::select! {
252 accepted = listener.accept() => {
253 let (socket, _) = match accepted {
254 Ok(val) => val,
255 Err(e) => {
256 return Err(GameStateIntegrationError::SocketRead(e));
257 }
258 };
259
260 if self.send_events.receiver_count() == 0 {
261 return Err(GameStateIntegrationError::NoHandlersAvailable);
263 }
264
265 let sender = self.send_events.clone();
266
267 tokio::spawn(async move {
268 match process(socket).await {
269 Err(e) => {
270 log::error!("{}", e);
271 Err(e)
272 }
273 Ok(buf) => match sender.send(buf) {
274 Ok(_) => Ok(()),
275 Err(_) => {
276 Err(GameStateIntegrationError::NoHandlersAvailable)
279 }
280 },
281 }
282 });
283
284 }
285 _ = self.notify.recv() => {
286 break;
287 }
288 }
289 }
290
291 Ok(())
292 }
293}
294
295#[derive(Debug)]
296pub struct ServerError {
297 listener_error: Option<GameStateIntegrationError>,
298 handler_errors: Option<Vec<GameStateIntegrationError>>,
299}
300
301impl Display for ServerError {
302 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303 write!(f, "There were one or more errors while running the server")?;
304
305 if let Some(e) = self.listener_error.as_ref() {
306 writeln!(f)?;
307 write!(f, "- {:#}", e)?;
308 }
309
310 if let Some(errors) = self.handler_errors.as_ref() {
311 for e in errors {
312 writeln!(f)?;
313 write!(f, "- {:#}", e)?;
314 }
315 }
316
317 Ok(())
318 }
319}
320
321impl StdError for ServerError {}
322
323pub struct Server {
330 listener: Option<task::JoinHandle<Result<(), GameStateIntegrationError>>>,
331 handlers: Vec<task::JoinHandle<Result<(), GameStateIntegrationError>>>,
332 notify_shutdown: broadcast::Sender<()>,
333 is_shutdown: bool,
334}
335
336impl Server {
337 pub fn new(
338 listener: task::JoinHandle<Result<(), GameStateIntegrationError>>,
339 handlers: impl IntoIterator<Item = task::JoinHandle<Result<(), GameStateIntegrationError>>>,
340 notify_shutdown: broadcast::Sender<()>,
341 ) -> Self {
342 Self {
343 listener: Some(listener),
344 handlers: handlers.into_iter().collect(),
345 notify_shutdown,
346 is_shutdown: false,
347 }
348 }
349
350 pub async fn run_forever(&self) {
351 let _ = self.notify_shutdown.subscribe().recv().await;
352 }
353
354 pub async fn shutdown(&mut self) -> Result<(), ServerError> {
356 let _ = self.notify_shutdown.send(());
357
358 let listener_result = if let Some(listener) = self.listener.take() {
359 match listener.await {
360 Ok(r) => r,
361 Err(e) => Err(GameStateIntegrationError::Unknown(e)),
362 }
363 } else {
364 Ok(())
365 };
366
367 let mut handler_errors: Vec<GameStateIntegrationError> = Vec::new();
368 let mut futures: stream::FuturesUnordered<_> = self.handlers.drain(..).collect();
369 while let Some(result) = futures.next().await {
370 match result {
371 Ok(Err(e)) => handler_errors.push(e),
372 Err(e) => handler_errors.push(GameStateIntegrationError::from(e)),
373 Ok(Ok(())) => {}
374 }
375 }
376
377 self.is_shutdown = true;
378
379 match (listener_result, handler_errors.len()) {
380 (Ok(()), 0) => Ok(()),
381 (Err(e), 0) => Err(ServerError {
382 listener_error: Some(e),
383 handler_errors: None,
384 }),
385 (Ok(()), _) => Err(ServerError {
386 listener_error: None,
387 handler_errors: Some(handler_errors),
388 }),
389 (Err(e), _) => Err(ServerError {
390 listener_error: Some(e),
391 handler_errors: Some(handler_errors),
392 }),
393 }
394 }
395
396 pub fn is_shutdown(&self) -> bool {
397 self.is_shutdown
398 }
399}
400
401pub struct ServerBuilder {
402 uri: String,
403 handlers: Vec<HandlerRegistration>,
404 mut_handlers: Vec<MutHandlerRegistration>,
405 notify_shutdown: broadcast::Sender<()>,
406 send_events: broadcast::Sender<bytes::Bytes>,
407 is_shutdown: bool,
408}
409
410impl ServerBuilder {
411 pub fn new(uri: &str) -> Self {
415 let (notify_shutdown, _) = broadcast::channel(1);
416 let (send_events, _) = broadcast::channel(16);
417
418 Self {
419 uri: uri.to_owned(),
420 notify_shutdown,
421 send_events,
422 is_shutdown: false,
423 handlers: Vec::new(),
424 mut_handlers: Vec::new(),
425 }
426 }
427
428 pub fn register<H>(mut self, handler: H) -> Self
432 where
433 H: Handler,
434 {
435 let registration = HandlerRegistration::new(
436 handler,
437 self.notify_shutdown.subscribe(),
438 self.send_events.subscribe(),
439 );
440 self.handlers.push(registration);
441
442 self
443 }
444
445 pub fn register_mut<H>(mut self, handler: H) -> Self
449 where
450 H: MutHandler,
451 {
452 let registration = MutHandlerRegistration::new(
453 handler,
454 self.notify_shutdown.subscribe(),
455 self.send_events.subscribe(),
456 );
457 self.mut_handlers.push(registration);
458
459 self
460 }
461
462 pub fn start(self) -> Result<Server, GameStateIntegrationError> {
464 if self.is_shutdown {
465 return Err(GameStateIntegrationError::ServerShutdown);
466 }
467
468 let listener = Listener::new(
469 &self.uri,
470 self.notify_shutdown.subscribe(),
471 self.send_events,
472 );
473
474 let iter = self
475 .handlers
476 .into_iter()
477 .map(|h| tokio::spawn(async move { h.run().await }))
478 .chain(
479 self.mut_handlers
480 .into_iter()
481 .map(|h| tokio::spawn(async move { h.run().await })),
482 );
483
484 Ok(Server::new(
485 tokio::spawn(async move { listener.run().await }),
486 iter,
487 self.notify_shutdown,
488 ))
489 }
490}
491
492pub async fn process(mut socket: TcpStream) -> Result<bytes::Bytes, GameStateIntegrationError> {
497 if let Err(e) = socket.readable().await {
498 log::error!("socket is not readable");
499 return Err(GameStateIntegrationError::from(e));
500 };
501
502 let mut buf = bytes::BytesMut::with_capacity(INITIAL_REQUEST_BUFFER_CAPACITY_BYTES);
503 let request_length: usize;
504 let content_length: usize;
505
506 loop {
507 match socket.read_buf(&mut buf).await {
508 Ok(n) => n,
509 Err(e) => {
510 log::error!("failed to read request from socket: {}", e);
511 return Err(GameStateIntegrationError::from(e));
512 }
513 };
514
515 let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
516 let mut r = httparse::Request::new(&mut headers);
517
518 request_length = match r.parse(&buf) {
519 Ok(httparse::Status::Complete(size)) => size,
520 Ok(httparse::Status::Partial) => {
521 log::debug!("partial request parsed, need to read more");
522 continue;
523 }
524 Err(e) => {
525 log::error!("failed to parse request: {}", e);
526 return Err(GameStateIntegrationError::from(e));
527 }
528 };
529 log::debug!("headers: {:?}", headers);
530 content_length = get_content_length_from_headers(&headers)?;
531 break;
532 }
533
534 let remaining = (request_length + content_length).saturating_sub(buf.len());
536 buf.reserve(remaining);
537
538 while buf.len() < request_length + content_length {
539 match socket.read_buf(&mut buf).await {
540 Ok(0) => {
541 log::error!("eof before receiving full body");
542 return Err(GameStateIntegrationError::UnexpectedEOF);
543 }
544 Ok(_) => {}
545 Err(e) => {
546 log::error!("failed to read body from socket: {}", e);
547 return Err(GameStateIntegrationError::from(e));
548 }
549 };
550 }
551
552 if let Err(e) = socket.write_all(OK.as_bytes()).await {
553 log::error!("failed to write to socket: {}", e);
554 return Err(GameStateIntegrationError::from(e));
555 };
556
557 Ok(buf.split_off(request_length).freeze())
558}
559
560pub fn get_content_length_from_headers(
562 headers: &[httparse::Header],
563) -> Result<usize, GameStateIntegrationError> {
564 match headers
565 .iter()
566 .filter(|h| h.name == "Content-Length")
567 .map(|h| h.value)
568 .next()
569 {
570 Some(value) => {
571 let str_length = match std::str::from_utf8(value) {
572 Ok(s) => s,
573 Err(e) => {
574 return Err(GameStateIntegrationError::InvalidContentLength(format!(
575 "failed to parse bytes as str: {}",
576 e
577 )));
578 }
579 };
580 match str_length.parse::<usize>() {
581 Ok(n) => Ok(n),
582 Err(e) => Err(GameStateIntegrationError::InvalidContentLength(format!(
583 "failed to parse str into usize: {}",
584 e
585 ))),
586 }
587 }
588 None => Err(GameStateIntegrationError::MissingContentLengthHeader),
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595 use std::time;
596 use tokio::sync::mpsc;
597 use tokio::time::{sleep, timeout};
598
599 const TEST_URI: &'static str = "127.0.0.1:10080";
600
601 #[test]
602 fn test_get_content_length_from_headers() {
603 let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
604 let mut r = httparse::Request::new(&mut headers);
605 let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 54943\r\n\r\n";
606 r.parse(request_bytes)
607 .expect("parsing the request should never fail");
608
609 let expected = 54943 as usize;
610 let content_length =
611 get_content_length_from_headers(&r.headers).expect("failed to get Content-Length");
612
613 assert_eq!(content_length, expected);
614 }
615
616 #[test]
617 fn test_get_content_length_from_headers_not_found() {
618 let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
619 let mut r = httparse::Request::new(&mut headers);
620 let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\n\r\n";
621 r.parse(request_bytes)
622 .expect("parsing the request should never fail");
623
624 let content_length = get_content_length_from_headers(&r.headers);
625
626 assert!(matches!(
627 content_length,
628 Err(GameStateIntegrationError::MissingContentLengthHeader)
629 ));
630 }
631
632 #[test]
633 fn test_get_content_length_from_headers_not_a_number() {
634 let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
635 let mut r = httparse::Request::new(&mut headers);
636 let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: asdasd\r\n\r\n";
637 r.parse(request_bytes)
638 .expect("parsing the request should never fail");
639
640 let content_length = get_content_length_from_headers(&r.headers);
641
642 assert!(matches!(
643 content_length,
644 Err(GameStateIntegrationError::InvalidContentLength(_))
645 ));
646 }
647
648 #[tokio::test]
649 async fn test_process() {
650 let listener = TcpListener::bind(TEST_URI)
651 .await
652 .expect("failed to bind to address");
653 let local_addr = listener.local_addr().unwrap();
654 let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
655 let expected = b"{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
656
657 tokio::spawn(async move {
658 if let Ok((mut stream, _)) = listener.accept().await {
659 let _ = stream.write_all(sample_request).await;
660 let _ = stream.shutdown().await;
661 }
662 });
663
664 let stream = TcpStream::connect(local_addr)
665 .await
666 .expect("failed to connect to address");
667
668 let result = process(stream).await.expect("processing failed");
669 assert_eq!(result.len(), expected.len());
670 assert_eq!(result.as_ref(), expected);
671 }
672
673 #[tokio::test]
674 async fn test_server_handles_events() {
675 let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:20080\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
676 let expected = bytes::Bytes::from_static(b"{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}");
677
678 let (tx1, mut rx1) = mpsc::channel(2);
679 let (tx2, mut rx2) = mpsc::channel(2);
680
681 let mut server = ServerBuilder::new("127.0.0.1:30080")
682 .register(move |event| {
683 let tx1 = tx1.clone();
684 async move {
685 let _ = &tx1.send(event).await?;
686 Ok::<(), mpsc::error::SendError<bytes::Bytes>>(())
687 }
688 })
689 .register(move |event| {
690 let tx2 = tx2.clone();
691 async move {
692 let _ = &tx2.send(event).await?;
693 Ok::<(), mpsc::error::SendError<bytes::Bytes>>(())
694 }
695 })
696 .start()
697 .unwrap();
698
699 sleep(time::Duration::from_millis(10)).await;
701
702 tokio::spawn(async move {
703 for _ in 0..2 {
704 let mut stream = TcpStream::connect("127.0.0.1:30080").await.unwrap();
705 let _ = stream.write_all(sample_request).await;
706 let _ = stream.shutdown().await;
707 }
708 });
709
710 sleep(time::Duration::from_millis(10)).await;
712
713 if let Err(_) = timeout(time::Duration::from_secs(5), server.shutdown()).await {
714 panic!("did not shut down in 5 seconds");
715 }
716
717 let mut v1 = Vec::new();
718 let mut v2 = Vec::new();
719
720 async fn capture(rx: &mut mpsc::Receiver<bytes::Bytes>, v: &mut Vec<bytes::Bytes>) {
721 let val = rx.recv().await;
722 v.push(val.unwrap());
723 }
724
725 if let Err(_) = timeout(time::Duration::from_secs(5), async {
726 tokio::join!(capture(&mut rx1, &mut v1), capture(&mut rx2, &mut v2));
727 tokio::join!(capture(&mut rx1, &mut v1), capture(&mut rx2, &mut v2));
728 })
729 .await
730 {
731 println!("did not receive values within 5 seconds");
732 }
733
734 assert_eq!(v1.len(), 2);
735 assert_eq!(v2.len(), 2);
736 assert_eq!(v1[0], &expected);
737 assert_eq!(v1[1], &expected);
738 assert_eq!(v2[0], &expected);
739 assert_eq!(v2[1], &expected);
740 assert!(server.is_shutdown());
741 }
742
743 #[tokio::test]
744 async fn test_listener_shutsdown_when_all_handlers_fail() {
745 let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:20080\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
746
747 let mut server = ServerBuilder::new("127.0.0.1:40080")
748 .register(move |_| async move { Err(anyhow::anyhow!("an error")) })
749 .register(move |_| async move { Err(anyhow::anyhow!("another error")) })
750 .start()
751 .unwrap();
752
753 sleep(time::Duration::from_millis(10)).await;
755
756 tokio::spawn(async move {
757 for _ in 0..2 {
758 let mut stream = TcpStream::connect("127.0.0.1:40080").await.unwrap();
759 let _ = stream.write_all(sample_request).await;
760 let _ = stream.shutdown().await;
761 }
762 });
763
764 sleep(time::Duration::from_millis(10)).await;
766
767 tokio::spawn(async move {
769 let mut stream = TcpStream::connect("127.0.0.1:40080").await.unwrap();
770 let _ = stream.write_all(sample_request).await;
771 let _ = stream.shutdown().await;
772 });
773
774 sleep(time::Duration::from_millis(10)).await;
776
777 let _expected_handler_errors: Vec<GameStateIntegrationError> = vec![
778 GameStateIntegrationError::Handler {
779 source: anyhow::anyhow!("an error"),
780 },
781 GameStateIntegrationError::Handler {
782 source: anyhow::anyhow!("another error"),
783 },
784 ];
785 match timeout(time::Duration::from_secs(5), server.shutdown()).await {
786 Err(_) => {
787 panic!("did not finish in 5 seconds");
788 }
789 Ok(result) => {
790 assert!(matches!(
791 result,
792 Err(ServerError {
793 listener_error: Some(GameStateIntegrationError::NoHandlersAvailable),
794 handler_errors: Some(_expected_handler_errors)
795 })
796 ));
797 }
798 }
799 }
800}