dofus_framework/io/tcp/
server.rs

1use futures::stream::FuturesUnordered;
2use futures::StreamExt;
3use std::fmt::Formatter;
4use std::io;
5
6use thiserror::Error;
7use tokio::net::TcpListener;
8use tokio::select;
9use tokio::sync::mpsc;
10use tokio::task::JoinHandle;
11use tracing::{error, info};
12
13use crate::io::tcp::client_handler::{TcpClientTask, TcpClientTaskError};
14use crate::io::tcp::server::TcpServerError::{BindingError, SendMessageError};
15use crate::io::tcp::server::TcpServerMessage::Stop;
16use crate::io::tcp::tcp_client_action::TcpClientAction;
17
18pub(crate) const BUFFER_SIZE: usize = 8;
19
20pub struct TcpServer {}
21
22#[derive(Debug)]
23pub enum TcpServerMessage {
24    Stop,
25    Other,
26}
27
28impl std::fmt::Display for TcpServerMessage {
29    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
30        match self {
31            Stop => {
32                write!(f, "Stop")
33            }
34            TcpServerMessage::Other => {
35                write!(f, "Other")
36            }
37        }
38    }
39}
40
41#[derive(Debug, Error)]
42pub enum TcpServerError {
43    #[error("Error while trying to bind to the given address {address:?}, due to: {source:?}")]
44    BindingError { address: String, source: io::Error },
45    #[error("Error while accepting a new tcp client, due to: {0}")]
46    AcceptClientError(io::Error),
47    #[error("Error while sending the following command to the server: {0}")]
48    SendMessageError(TcpServerMessage),
49    #[error("An error occurred while joining the client tasks")]
50    JoiningError,
51    #[error("A client task exited with the following error: {0}")]
52    ClientTaskError(TcpClientTaskError),
53}
54
55pub struct TcpServerHandle(mpsc::Sender<TcpServerMessage>);
56
57impl TcpServer {
58    pub fn start(
59        sender: mpsc::Sender<TcpClientAction>,
60        address: String,
61        port: u16,
62    ) -> (TcpServerHandle, JoinHandle<Result<(), TcpServerError>>) {
63        let (tcp_server_sender, mut receiver) = mpsc::channel(BUFFER_SIZE);
64        let join_handle = tokio::spawn(async move {
65            let (listener, mut client_tasks) = Self::bind_address(address, port).await?;
66            let mut result: Result<(), TcpServerError> = Ok(());
67            loop {
68                select! {
69                     accept_result = listener.accept() => { // Listen to new clients.
70                        let (stream, _socket) = match accept_result {
71                            Ok((stream, _socket)) => (stream, _socket),
72                            Err(msg) => { // Stop server immediately if accept returns an error
73                                result = Err(TcpServerError::AcceptClientError(msg));
74                                break
75                            }
76                        };
77                        client_tasks.push(TcpClientTask::handle_client(stream, sender.clone()));
78                        // Create a TcpClientTask (which spawns a new task, to handle the client connection)
79                    }
80                    Some(_finished_client_task) = client_tasks.next() => { // Listen to exiting tcp client tasks
81                        //TODO handle client task exiting
82                    }
83                    Some(TcpServerMessage::Stop) = receiver.recv() => { // Listen to tcpserver handle messages (e.g stop server)
84                        info!("Server just stopped listening to messages");
85                        break
86                    }
87                }
88            }
89            //TODO do we need to signal the client tasks to stop?
90            futures::future::join_all(client_tasks).await; // Wait for all child tasks to terminate
91            result
92        });
93        (TcpServerHandle::new(tcp_server_sender), join_handle)
94    }
95
96    async fn bind_address(
97        address: String,
98        port: u16,
99    ) -> Result<
100        (
101            TcpListener,
102            FuturesUnordered<JoinHandle<Result<(), TcpClientTaskError>>>,
103        ),
104        TcpServerError,
105    > {
106        let binding = format!("{}:{}", address, port);
107        let listener = TcpListener::bind(&binding)
108            .await
109            .map_err(|e| BindingError {
110                source: e,
111                address: binding,
112            })?;
113        let client_tasks: FuturesUnordered<JoinHandle<Result<(), TcpClientTaskError>>> =
114            FuturesUnordered::new();
115        Ok((listener, client_tasks))
116    }
117}
118
119impl TcpServerHandle {
120    pub fn new(sender: mpsc::Sender<TcpServerMessage>) -> Self {
121        TcpServerHandle(sender)
122    }
123    pub fn stop(&self) -> Result<(), TcpServerError> {
124        self.0.try_send(Stop).map_err(|_| SendMessageError(Stop))
125    }
126}