dofus_framework/io/tcp/
server.rs1use futures::stream::FuturesUnordered;
2use futures::StreamExt;
3use std::fmt::Formatter;
4use std::io;
5
6use thiserror::Error;
7use tokio::net::TcpListener;
8use tokio::select;
9use tokio::sync::mpsc;
10use tokio::task::JoinHandle;
11use tracing::{error, info};
12
13use crate::io::tcp::client_handler::{TcpClientTask, TcpClientTaskError};
14use crate::io::tcp::server::TcpServerError::{BindingError, SendMessageError};
15use crate::io::tcp::server::TcpServerMessage::Stop;
16use crate::io::tcp::tcp_client_action::TcpClientAction;
17
18pub(crate) const BUFFER_SIZE: usize = 8;
19
20pub struct TcpServer {}
21
22#[derive(Debug)]
23pub enum TcpServerMessage {
24 Stop,
25 Other,
26}
27
28impl std::fmt::Display for TcpServerMessage {
29 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
30 match self {
31 Stop => {
32 write!(f, "Stop")
33 }
34 TcpServerMessage::Other => {
35 write!(f, "Other")
36 }
37 }
38 }
39}
40
41#[derive(Debug, Error)]
42pub enum TcpServerError {
43 #[error("Error while trying to bind to the given address {address:?}, due to: {source:?}")]
44 BindingError { address: String, source: io::Error },
45 #[error("Error while accepting a new tcp client, due to: {0}")]
46 AcceptClientError(io::Error),
47 #[error("Error while sending the following command to the server: {0}")]
48 SendMessageError(TcpServerMessage),
49 #[error("An error occurred while joining the client tasks")]
50 JoiningError,
51 #[error("A client task exited with the following error: {0}")]
52 ClientTaskError(TcpClientTaskError),
53}
54
55pub struct TcpServerHandle(mpsc::Sender<TcpServerMessage>);
56
57impl TcpServer {
58 pub fn start(
59 sender: mpsc::Sender<TcpClientAction>,
60 address: String,
61 port: u16,
62 ) -> (TcpServerHandle, JoinHandle<Result<(), TcpServerError>>) {
63 let (tcp_server_sender, mut receiver) = mpsc::channel(BUFFER_SIZE);
64 let join_handle = tokio::spawn(async move {
65 let (listener, mut client_tasks) = Self::bind_address(address, port).await?;
66 let mut result: Result<(), TcpServerError> = Ok(());
67 loop {
68 select! {
69 accept_result = listener.accept() => { let (stream, _socket) = match accept_result {
71 Ok((stream, _socket)) => (stream, _socket),
72 Err(msg) => { result = Err(TcpServerError::AcceptClientError(msg));
74 break
75 }
76 };
77 client_tasks.push(TcpClientTask::handle_client(stream, sender.clone()));
78 }
80 Some(_finished_client_task) = client_tasks.next() => { }
83 Some(TcpServerMessage::Stop) = receiver.recv() => { info!("Server just stopped listening to messages");
85 break
86 }
87 }
88 }
89 futures::future::join_all(client_tasks).await; result
92 });
93 (TcpServerHandle::new(tcp_server_sender), join_handle)
94 }
95
96 async fn bind_address(
97 address: String,
98 port: u16,
99 ) -> Result<
100 (
101 TcpListener,
102 FuturesUnordered<JoinHandle<Result<(), TcpClientTaskError>>>,
103 ),
104 TcpServerError,
105 > {
106 let binding = format!("{}:{}", address, port);
107 let listener = TcpListener::bind(&binding)
108 .await
109 .map_err(|e| BindingError {
110 source: e,
111 address: binding,
112 })?;
113 let client_tasks: FuturesUnordered<JoinHandle<Result<(), TcpClientTaskError>>> =
114 FuturesUnordered::new();
115 Ok((listener, client_tasks))
116 }
117}
118
119impl TcpServerHandle {
120 pub fn new(sender: mpsc::Sender<TcpServerMessage>) -> Self {
121 TcpServerHandle(sender)
122 }
123 pub fn stop(&self) -> Result<(), TcpServerError> {
124 self.0.try_send(Stop).map_err(|_| SendMessageError(Stop))
125 }
126}