1use std::error::Error as StdError;
45use std::fmt::{self, Display};
46use std::future::Future;
47use std::io;
48
49use async_trait::async_trait;
50use futures::{StreamExt, stream};
51use tokio::io::{AsyncReadExt, AsyncWriteExt};
52use tokio::net::{TcpListener, TcpStream};
53use tokio::sync::broadcast;
54use tokio::task;
55
56pub mod components;
57pub mod handlers;
58
59const INITIAL_REQUEST_BUFFER_CAPACITY_BYTES: usize = 1024;
63
64const EXPECTED_NUMBER_OF_HEADERS: usize = 7;
67
68const OK: &str = "HTTP/1.1 200 OK\ncontent-type: text/html\n";
71
72#[derive(thiserror::Error, Debug)]
73pub enum GameStateIntegrationError {
74 #[error("incomplete headers from game state integration request")]
75 IncompleteHeaders,
76 #[error("failed to read from socket")]
77 SocketRead(#[from] io::Error),
78 #[error("no handlers available to process request, is the server shutting down?")]
79 NoHandlersAvailable,
80 #[error("invalid content length header: {0}")]
81 InvalidContentLength(String),
82 #[error("missing Content-Length header in request")]
83 MissingContentLengthHeader,
84 #[error("invalid request received")]
85 InvalidRequest(#[from] httparse::Error),
86 #[error("server has already shutdown")]
87 ServerShutdown,
88 #[error("handler failed when handling event")]
89 Handler {
90 #[source]
91 source: anyhow::Error,
92 },
93 #[error("an error occurred while running the server")]
94 Unknown(#[from] task::JoinError),
95}
96
97pub type HandlerResult = Result<(), anyhow::Error>;
98
99#[async_trait]
103pub trait Handler: Send + Sync + 'static {
104 async fn handle(&self, event: bytes::Bytes) -> HandlerResult;
105}
106
107#[async_trait]
108impl<F, Fut, E> Handler for F
109where
110 F: Fn(bytes::Bytes) -> Fut + Send + Sync + 'static,
111 Fut: Future<Output = Result<(), E>> + Send,
112 E: Into<anyhow::Error>,
113{
114 async fn handle(&self, event: bytes::Bytes) -> HandlerResult {
115 (self)(event).await.map_err(|e| e.into())?;
116 Ok(())
117 }
118}
119
120pub(crate) struct HandlerRegistration {
122 inner: Box<dyn Handler>,
123 is_shutdown: bool,
124 notify: broadcast::Receiver<()>,
125 events: broadcast::Receiver<bytes::Bytes>,
126}
127
128impl HandlerRegistration {
129 pub(crate) fn new<H>(
130 handler: H,
131 notify: broadcast::Receiver<()>,
132 events: broadcast::Receiver<bytes::Bytes>,
133 ) -> Self
134 where
135 H: Handler,
136 {
137 Self {
138 inner: Box::new(handler),
139 is_shutdown: false,
140 notify,
141 events,
142 }
143 }
144
145 pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
146 loop {
147 tokio::select! {
148 received = self.events.recv() => {
149 match received {
150 Ok(event) => {
151 if let Err(e) = self.inner.handle(event).await {
152 return Err(GameStateIntegrationError::Handler{source: e});
153 };
154 },
155 Err(_) => {break;}
156 }
157 }
158 _ = self.notify.recv() => {
159 break;
160 }
161 }
162 }
163
164 self.is_shutdown = true;
165
166 Ok(())
167 }
168
169 #[allow(dead_code)]
170 pub(crate) fn is_shutdown(&self) -> bool {
171 self.is_shutdown
172 }
173}
174
175pub(crate) struct Listener {
177 uri: String,
178 is_shutdown: bool,
179 notify: broadcast::Receiver<()>,
180 send_events: broadcast::Sender<bytes::Bytes>,
181}
182
183impl Listener {
184 pub(crate) fn new(
185 uri: &str,
186 notify: broadcast::Receiver<()>,
187 send_events: broadcast::Sender<bytes::Bytes>,
188 ) -> Self {
189 Self {
190 uri: uri.to_owned(),
191 is_shutdown: false,
192 notify,
193 send_events,
194 }
195 }
196
197 pub(crate) async fn run(mut self) -> Result<(), GameStateIntegrationError> {
198 let listener = TcpListener::bind(&self.uri).await?;
199 log::info!("Listening on: {:?}", listener.local_addr());
200
201 loop {
202 tokio::select! {
203 accepted = listener.accept() => {
204 let (socket, _) = match accepted {
205 Ok(val) => val,
206 Err(e) => {
207 self.is_shutdown = true;
208 return Err(GameStateIntegrationError::SocketRead(e));
209 }
210 };
211
212 if self.send_events.receiver_count() == 0 {
213 return Err(GameStateIntegrationError::NoHandlersAvailable);
215 }
216
217 let sender = self.send_events.clone();
218
219 tokio::spawn(async move {
220 match process(socket).await {
221 Err(e) => {
222 log::error!("{}", e);
223 Err(e)
224 }
225 Ok(buf) => match sender.send(buf) {
226 Ok(_) => Ok(()),
227 Err(_) => {
228 Err(GameStateIntegrationError::NoHandlersAvailable)
231 }
232 },
233 }
234 });
235
236 }
237 _ = self.notify.recv() => {
238 break;
239 }
240 }
241 }
242
243 self.is_shutdown = true;
244
245 Ok(())
246 }
247
248 #[allow(dead_code)]
249 pub(crate) fn is_shutdown(&self) -> bool {
250 self.is_shutdown
251 }
252}
253
254#[derive(Debug)]
255pub struct ServerError {
256 listener_error: Option<GameStateIntegrationError>,
257 handler_errors: Option<Vec<GameStateIntegrationError>>,
258}
259
260impl Display for ServerError {
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 write!(f, "There were one or more errors while running the server")?;
263
264 if let Some(e) = self.listener_error.as_ref() {
265 writeln!(f)?;
266 write!(f, "- {:#}", e)?;
267 }
268
269 if let Some(errors) = self.handler_errors.as_ref() {
270 for e in errors {
271 writeln!(f)?;
272 write!(f, "- {:#}", e)?;
273 }
274 }
275
276 Ok(())
277 }
278}
279
280impl StdError for ServerError {}
281
282pub struct Server {
289 listener: Option<task::JoinHandle<Result<(), GameStateIntegrationError>>>,
290 handlers: Vec<task::JoinHandle<Result<(), GameStateIntegrationError>>>,
291 notify_shutdown: broadcast::Sender<()>,
292 is_shutdown: bool,
293}
294
295impl Server {
296 pub fn new(
297 listener: task::JoinHandle<Result<(), GameStateIntegrationError>>,
298 handlers: impl IntoIterator<Item = task::JoinHandle<Result<(), GameStateIntegrationError>>>,
299 notify_shutdown: broadcast::Sender<()>,
300 ) -> Self {
301 Self {
302 listener: Some(listener),
303 handlers: handlers.into_iter().collect(),
304 notify_shutdown,
305 is_shutdown: false,
306 }
307 }
308
309 pub async fn run_forever(&self) {
310 let _ = self.notify_shutdown.subscribe().recv().await;
311 }
312
313 pub async fn shutdown(&mut self) -> Result<(), ServerError> {
315 let _ = self.notify_shutdown.send(());
316
317 let listener_result = if let Some(listener) = self.listener.take() {
318 match listener.await {
319 Ok(r) => r,
320 Err(e) => Err(GameStateIntegrationError::Unknown(e)),
321 }
322 } else {
323 Ok(())
324 };
325
326 let mut handler_errors: Vec<GameStateIntegrationError> = Vec::new();
327 let mut futures: stream::FuturesUnordered<_> = self.handlers.drain(..).collect();
328 while let Some(result) = futures.next().await {
329 match result {
330 Ok(Err(e)) => handler_errors.push(e),
331 Err(e) => handler_errors.push(GameStateIntegrationError::from(e)),
332 Ok(Ok(())) => {}
333 }
334 }
335
336 self.is_shutdown = true;
337
338 match (listener_result, handler_errors.len()) {
339 (Ok(()), 0) => Ok(()),
340 (Err(e), 0) => Err(ServerError {
341 listener_error: Some(e),
342 handler_errors: None,
343 }),
344 (Ok(()), _) => Err(ServerError {
345 listener_error: None,
346 handler_errors: Some(handler_errors),
347 }),
348 (Err(e), _) => Err(ServerError {
349 listener_error: Some(e),
350 handler_errors: Some(handler_errors),
351 }),
352 }
353 }
354
355 pub fn is_shutdown(&self) -> bool {
356 self.is_shutdown
357 }
358}
359
360pub struct ServerBuilder {
361 uri: String,
362 handlers: Vec<HandlerRegistration>,
363 notify_shutdown: broadcast::Sender<()>,
364 send_events: broadcast::Sender<bytes::Bytes>,
365 is_shutdown: bool,
366}
367
368impl ServerBuilder {
369 pub fn new(uri: &str) -> Self {
373 let (notify_shutdown, _) = broadcast::channel(1);
374 let (send_events, _) = broadcast::channel(16);
375
376 Self {
377 uri: uri.to_owned(),
378 notify_shutdown,
379 send_events,
380 is_shutdown: false,
381 handlers: Vec::new(),
382 }
383 }
384
385 pub fn register<H>(mut self, handler: H) -> Self
389 where
390 H: Handler,
391 {
392 let registration = HandlerRegistration::new(
393 handler,
394 self.notify_shutdown.subscribe(),
395 self.send_events.subscribe(),
396 );
397 self.handlers.push(registration);
398
399 self
400 }
401
402 pub fn start(self) -> Result<Server, GameStateIntegrationError> {
404 if self.is_shutdown {
405 return Err(GameStateIntegrationError::ServerShutdown);
406 }
407
408 let listener = Listener::new(
409 &self.uri,
410 self.notify_shutdown.subscribe(),
411 self.send_events,
412 );
413
414 Ok(Server::new(
415 tokio::spawn(async move { listener.run().await }),
416 self.handlers
417 .into_iter()
418 .map(|h| tokio::spawn(async move { h.run().await })),
419 self.notify_shutdown,
420 ))
421 }
422}
423
424pub async fn process(mut socket: TcpStream) -> Result<bytes::Bytes, GameStateIntegrationError> {
429 if let Err(e) = socket.readable().await {
430 log::error!("socket is not readable");
431 return Err(GameStateIntegrationError::from(e));
432 };
433
434 let mut buf = bytes::BytesMut::with_capacity(INITIAL_REQUEST_BUFFER_CAPACITY_BYTES);
435 let request_length: usize;
436 let content_length: usize;
437
438 loop {
439 match socket.read_buf(&mut buf).await {
440 Ok(n) => n,
441 Err(e) => {
442 log::error!("failed to read request from socket: {}", e);
443 return Err(GameStateIntegrationError::from(e));
444 }
445 };
446
447 let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
448 let mut r = httparse::Request::new(&mut headers);
449
450 request_length = match r.parse(&buf) {
451 Ok(httparse::Status::Complete(size)) => size,
452 Ok(httparse::Status::Partial) => {
453 log::debug!("partial request parsed, need to read more");
454 continue;
455 }
456 Err(e) => {
457 log::error!("failed to parse request: {}", e);
458 return Err(GameStateIntegrationError::from(e));
459 }
460 };
461 log::debug!("headers: {:?}", headers);
462 content_length = get_content_length_from_headers(&headers)?;
463 break;
464 }
465
466 if buf.len() <= request_length + content_length {
467 buf.reserve(request_length + content_length);
468 match socket.read_buf(&mut buf).await {
469 Ok(n) => n,
470 Err(e) => {
471 log::error!("failed to read body from socket: {}", e);
472 return Err(GameStateIntegrationError::from(e));
473 }
474 };
475 }
476
477 if let Err(e) = socket.write_all(OK.as_bytes()).await {
478 log::error!("failed to write to socket: {}", e);
479 return Err(GameStateIntegrationError::from(e));
480 };
481
482 Ok(buf.split_off(request_length).freeze())
483}
484
485pub fn get_content_length_from_headers(
487 headers: &[httparse::Header],
488) -> Result<usize, GameStateIntegrationError> {
489 match headers
490 .iter()
491 .filter(|h| h.name == "Content-Length")
492 .map(|h| h.value)
493 .next()
494 {
495 Some(value) => {
496 let str_length = match std::str::from_utf8(value) {
497 Ok(s) => s,
498 Err(e) => {
499 return Err(GameStateIntegrationError::InvalidContentLength(format!(
500 "failed to parse bytes as str: {}",
501 e
502 )));
503 }
504 };
505 match str_length.parse::<usize>() {
506 Ok(n) => Ok(n),
507 Err(e) => Err(GameStateIntegrationError::InvalidContentLength(format!(
508 "failed to parse str into usize: {}",
509 e
510 ))),
511 }
512 }
513 None => Err(GameStateIntegrationError::MissingContentLengthHeader),
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use std::time;
521 use tokio::sync::mpsc;
522 use tokio::time::{sleep, timeout};
523
524 const TEST_URI: &'static str = "127.0.0.1:10080";
525
526 #[test]
527 fn test_get_content_length_from_headers() {
528 let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
529 let mut r = httparse::Request::new(&mut headers);
530 let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 54943\r\n\r\n";
531 r.parse(request_bytes)
532 .expect("parsing the request should never fail");
533
534 let expected = 54943 as usize;
535 let content_length =
536 get_content_length_from_headers(&r.headers).expect("failed to get Content-Length");
537
538 assert_eq!(content_length, expected);
539 }
540
541 #[test]
542 fn test_get_content_length_from_headers_not_found() {
543 let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
544 let mut r = httparse::Request::new(&mut headers);
545 let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\n\r\n";
546 r.parse(request_bytes)
547 .expect("parsing the request should never fail");
548
549 let content_length = get_content_length_from_headers(&r.headers);
550
551 assert!(matches!(
552 content_length,
553 Err(GameStateIntegrationError::MissingContentLengthHeader)
554 ));
555 }
556
557 #[test]
558 fn test_get_content_length_from_headers_not_a_number() {
559 let mut headers = [httparse::EMPTY_HEADER; EXPECTED_NUMBER_OF_HEADERS];
560 let mut r = httparse::Request::new(&mut headers);
561 let request_bytes = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: asdasd\r\n\r\n";
562 r.parse(request_bytes)
563 .expect("parsing the request should never fail");
564
565 let content_length = get_content_length_from_headers(&r.headers);
566
567 assert!(matches!(
568 content_length,
569 Err(GameStateIntegrationError::InvalidContentLength(_))
570 ));
571 }
572
573 #[tokio::test]
574 async fn test_process() {
575 let listener = TcpListener::bind(TEST_URI)
576 .await
577 .expect("failed to bind to address");
578 let local_addr = listener.local_addr().unwrap();
579 let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:3000\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
580 let expected = b"{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
581
582 tokio::spawn(async move {
583 if let Ok((mut stream, _)) = listener.accept().await {
584 let _ = stream.write_all(sample_request).await;
585 let _ = stream.shutdown().await;
586 }
587 });
588
589 let stream = TcpStream::connect(local_addr)
590 .await
591 .expect("failed to connect to address");
592
593 let result = process(stream).await.expect("processing failed");
594 assert_eq!(result.len(), expected.len());
595 assert_eq!(result.as_ref(), expected);
596 }
597
598 #[tokio::test]
599 async fn test_server_handles_events() {
600 let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:20080\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
601 let expected = bytes::Bytes::from_static(b"{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}");
602
603 let (tx1, mut rx1) = mpsc::channel(2);
604 let (tx2, mut rx2) = mpsc::channel(2);
605
606 let mut server = ServerBuilder::new("127.0.0.1:30080")
607 .register(move |event| {
608 let tx1 = tx1.clone();
609 async move {
610 let _ = &tx1.send(event).await?;
611 Ok::<(), mpsc::error::SendError<bytes::Bytes>>(())
612 }
613 })
614 .register(move |event| {
615 let tx2 = tx2.clone();
616 async move {
617 let _ = &tx2.send(event).await?;
618 Ok::<(), mpsc::error::SendError<bytes::Bytes>>(())
619 }
620 })
621 .start()
622 .unwrap();
623
624 sleep(time::Duration::from_millis(10)).await;
626
627 tokio::spawn(async move {
628 for _ in 0..2 {
629 let mut stream = TcpStream::connect("127.0.0.1:30080").await.unwrap();
630 let _ = stream.write_all(sample_request).await;
631 let _ = stream.shutdown().await;
632 }
633 });
634
635 sleep(time::Duration::from_millis(10)).await;
637
638 if let Err(_) = timeout(time::Duration::from_secs(5), server.shutdown()).await {
639 panic!("did not shut down in 5 seconds");
640 }
641
642 let mut v1 = Vec::new();
643 let mut v2 = Vec::new();
644
645 async fn capture(rx: &mut mpsc::Receiver<bytes::Bytes>, v: &mut Vec<bytes::Bytes>) {
646 let val = rx.recv().await;
647 v.push(val.unwrap());
648 }
649
650 if let Err(_) = timeout(time::Duration::from_secs(5), async {
651 tokio::join!(capture(&mut rx1, &mut v1), capture(&mut rx2, &mut v2));
652 tokio::join!(capture(&mut rx1, &mut v1), capture(&mut rx2, &mut v2));
653 })
654 .await
655 {
656 println!("did not receive values within 5 seconds");
657 }
658
659 assert_eq!(v1.len(), 2);
660 assert_eq!(v2.len(), 2);
661 assert_eq!(v1[0], &expected);
662 assert_eq!(v1[1], &expected);
663 assert_eq!(v2[0], &expected);
664 assert_eq!(v2[1], &expected);
665 assert!(server.is_shutdown());
666 }
667
668 #[tokio::test]
669 async fn test_listener_shutsdown_when_all_handlers_fail() {
670 let sample_request = b"POST / HTTP/1.1\r\nuser-agent: Valve/Steam HTTP Client 1.0 (570)\r\nContent-Type: application/json\r\nHost: 127.0.0.1:20080\r\nAccept: text/html,*/*;q=0.9\r\naccept-encoding: gzip,identity,*;q=0\r\naccept-charset: ISO-8859-1,utf-8,*;q=0.7\r\nContent-Length: 173\r\n\r\n{\n\t\"provider\": {\n\t\t\"name\": \"Dota 2\",\n\t\t\"appid\": 570,\n\t\t\"version\": 47,\n\t\t\"timestamp\": 1688514013\n\t},\n\t\"player\": {\n\n\t},\n\t\"draft\": {\n\n\t},\n\t\"auth\": {\n\t\t\"token\": \"hello1234\"\n\t}\n}";
671
672 let mut server = ServerBuilder::new("127.0.0.1:40080")
673 .register(move |_| async move { Err(anyhow::anyhow!("an error")) })
674 .register(move |_| async move { Err(anyhow::anyhow!("another error")) })
675 .start()
676 .unwrap();
677
678 sleep(time::Duration::from_millis(10)).await;
680
681 tokio::spawn(async move {
682 for _ in 0..2 {
683 let mut stream = TcpStream::connect("127.0.0.1:40080").await.unwrap();
684 let _ = stream.write_all(sample_request).await;
685 let _ = stream.shutdown().await;
686 }
687 });
688
689 sleep(time::Duration::from_millis(10)).await;
691
692 tokio::spawn(async move {
694 let mut stream = TcpStream::connect("127.0.0.1:40080").await.unwrap();
695 let _ = stream.write_all(sample_request).await;
696 let _ = stream.shutdown().await;
697 });
698
699 sleep(time::Duration::from_millis(10)).await;
701
702 let _expected_handler_errors: Vec<GameStateIntegrationError> = vec![
703 GameStateIntegrationError::Handler {
704 source: anyhow::anyhow!("an error"),
705 },
706 GameStateIntegrationError::Handler {
707 source: anyhow::anyhow!("another error"),
708 },
709 ];
710 match timeout(time::Duration::from_secs(5), server.shutdown()).await {
711 Err(_) => {
712 panic!("did not finish in 5 seconds");
713 }
714 Ok(result) => {
715 assert!(matches!(
716 result,
717 Err(ServerError {
718 listener_error: Some(GameStateIntegrationError::NoHandlersAvailable),
719 handler_errors: Some(_expected_handler_errors)
720 })
721 ));
722 }
723 }
724 }
725}