dofus_framework/io/tcp/
client_handler.rs

1use std::io;
2
3use thiserror::Error;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5use tokio::net::tcp::{ReadHalf, WriteHalf};
6use tokio::net::TcpStream;
7use tokio::select;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::TrySendError;
10use tokio::task::JoinHandle;
11
12use crate::io::tcp::client_handler::TcpClientTaskError::{
13    ClientConnectError, ClientDataError, ClientDisconnectError, ClientParseError, ClientReadError,
14    ClientWriteError,
15};
16use crate::io::tcp::server::BUFFER_SIZE;
17use crate::io::tcp::tcp_client_action::{TcpClientActionHandle, TcpClientActionSender};
18
19const CLIENT_BUFFER_SIZE: usize = 1024;
20
21#[derive(Debug)]
22pub enum TcpClientTaskMessage {
23    Stop,
24    Send { data: String },
25}
26
27#[derive(Debug, Clone)]
28pub struct TcpClientTaskHandle {
29    sender: mpsc::Sender<TcpClientTaskMessage>,
30}
31
32impl TcpClientTaskHandle {
33    pub fn new(sender: mpsc::Sender<TcpClientTaskMessage>) -> Self {
34        TcpClientTaskHandle { sender }
35    }
36
37    pub fn send_data(&self, data: String) -> Result<(), TrySendError<TcpClientTaskMessage>> {
38        self.sender.try_send(TcpClientTaskMessage::Send { data })
39    }
40
41    pub fn stop(&self) -> Result<(), TrySendError<TcpClientTaskMessage>> {
42        self.sender.try_send(TcpClientTaskMessage::Stop)
43    }
44}
45
46pub(crate) struct TcpClientTask {}
47
48#[derive(Debug, Error)]
49pub enum TcpClientTaskError {
50    #[error("Error while writing the data {0} to the client, due to: {1}")]
51    ClientWriteError(String, io::Error),
52    #[error("Error while trying to parse content from the client")]
53    ClientParseError,
54    #[error("Error while trying to read data from the client")]
55    ClientReadError,
56    #[error("Error while trying to send client connect action")]
57    ClientConnectError,
58    #[error("Error while trying to send client disconnect action")]
59    ClientDisconnectError,
60    #[error("Error while trying to send client send data action")]
61    ClientDataError,
62}
63
64impl TcpClientTask {
65    pub fn handle_client(
66        mut client_stream: TcpStream,
67        tcp_client_action_sender: TcpClientActionSender,
68    ) -> JoinHandle<Result<(), TcpClientTaskError>> {
69        let (tcp_client_task_sender, tcp_client_task_receiver) = mpsc::channel(BUFFER_SIZE);
70        tokio::spawn(async move {
71            let (reader, writer) = client_stream.split();
72            select! {
73                res = Self::listen_to_messages(tcp_client_task_receiver, writer) => {res}
74                res = Self::listen_to_client(reader, tcp_client_action_sender, TcpClientTaskHandle::new(tcp_client_task_sender)) => {res}
75            }
76        })
77    }
78
79    async fn listen_to_client(
80        mut reader: ReadHalf<'_>,
81        tcp_client_action_sender: TcpClientActionSender,
82        tcp_client_task_handle: TcpClientTaskHandle,
83    ) -> Result<(), TcpClientTaskError> {
84        TcpClientActionHandle::new(
85            tcp_client_action_sender.clone(),
86            tcp_client_task_handle.clone(),
87        )
88        .client_connect()
89        .map_err(|_| ClientConnectError)?;
90        let mut buffer = [0u8; CLIENT_BUFFER_SIZE];
91        loop {
92            match reader.read(&mut buffer).await {
93                Ok(bytes_read) if bytes_read == 0 => {
94                    TcpClientActionHandle::new(
95                        tcp_client_action_sender.clone(),
96                        tcp_client_task_handle.clone(),
97                    )
98                    .client_disconnect()
99                    .map_err(|_| ClientDisconnectError)?;
100                    return Ok(());
101                }
102                Ok(bytes_read) => {
103                    let data_as_string = String::from_utf8(Vec::from(&buffer[0..bytes_read]))
104                        .map_err(|_| ClientParseError)?;
105                    TcpClientActionHandle::new(
106                        tcp_client_action_sender.clone(),
107                        tcp_client_task_handle.clone(),
108                    )
109                    .client_data(data_as_string)
110                    .map_err(|_| ClientDataError)?;
111                }
112                Err(_) => return Err(ClientReadError),
113            }
114        }
115    }
116
117    async fn listen_to_messages(
118        mut receiver: mpsc::Receiver<TcpClientTaskMessage>,
119        mut writer: WriteHalf<'_>,
120    ) -> Result<(), TcpClientTaskError> {
121        loop {
122            match receiver.recv().await {
123                None => return Ok(()),
124                Some(TcpClientTaskMessage::Stop) => return Ok(()),
125                Some(TcpClientTaskMessage::Send { data }) => writer
126                    .write_all(data.as_bytes())
127                    .await
128                    .map_err(|e| ClientWriteError(data, e)),
129            }?
130        }
131    }
132}