dofus_framework/io/tcp/
client_handler.rs1use std::io;
2
3use thiserror::Error;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5use tokio::net::tcp::{ReadHalf, WriteHalf};
6use tokio::net::TcpStream;
7use tokio::select;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::TrySendError;
10use tokio::task::JoinHandle;
11
12use crate::io::tcp::client_handler::TcpClientTaskError::{
13 ClientConnectError, ClientDataError, ClientDisconnectError, ClientParseError, ClientReadError,
14 ClientWriteError,
15};
16use crate::io::tcp::server::BUFFER_SIZE;
17use crate::io::tcp::tcp_client_action::{TcpClientActionHandle, TcpClientActionSender};
18
19const CLIENT_BUFFER_SIZE: usize = 1024;
20
21#[derive(Debug)]
22pub enum TcpClientTaskMessage {
23 Stop,
24 Send { data: String },
25}
26
27#[derive(Debug, Clone)]
28pub struct TcpClientTaskHandle {
29 sender: mpsc::Sender<TcpClientTaskMessage>,
30}
31
32impl TcpClientTaskHandle {
33 pub fn new(sender: mpsc::Sender<TcpClientTaskMessage>) -> Self {
34 TcpClientTaskHandle { sender }
35 }
36
37 pub fn send_data(&self, data: String) -> Result<(), TrySendError<TcpClientTaskMessage>> {
38 self.sender.try_send(TcpClientTaskMessage::Send { data })
39 }
40
41 pub fn stop(&self) -> Result<(), TrySendError<TcpClientTaskMessage>> {
42 self.sender.try_send(TcpClientTaskMessage::Stop)
43 }
44}
45
46pub(crate) struct TcpClientTask {}
47
48#[derive(Debug, Error)]
49pub enum TcpClientTaskError {
50 #[error("Error while writing the data {0} to the client, due to: {1}")]
51 ClientWriteError(String, io::Error),
52 #[error("Error while trying to parse content from the client")]
53 ClientParseError,
54 #[error("Error while trying to read data from the client")]
55 ClientReadError,
56 #[error("Error while trying to send client connect action")]
57 ClientConnectError,
58 #[error("Error while trying to send client disconnect action")]
59 ClientDisconnectError,
60 #[error("Error while trying to send client send data action")]
61 ClientDataError,
62}
63
64impl TcpClientTask {
65 pub fn handle_client(
66 mut client_stream: TcpStream,
67 tcp_client_action_sender: TcpClientActionSender,
68 ) -> JoinHandle<Result<(), TcpClientTaskError>> {
69 let (tcp_client_task_sender, tcp_client_task_receiver) = mpsc::channel(BUFFER_SIZE);
70 tokio::spawn(async move {
71 let (reader, writer) = client_stream.split();
72 select! {
73 res = Self::listen_to_messages(tcp_client_task_receiver, writer) => {res}
74 res = Self::listen_to_client(reader, tcp_client_action_sender, TcpClientTaskHandle::new(tcp_client_task_sender)) => {res}
75 }
76 })
77 }
78
79 async fn listen_to_client(
80 mut reader: ReadHalf<'_>,
81 tcp_client_action_sender: TcpClientActionSender,
82 tcp_client_task_handle: TcpClientTaskHandle,
83 ) -> Result<(), TcpClientTaskError> {
84 TcpClientActionHandle::new(
85 tcp_client_action_sender.clone(),
86 tcp_client_task_handle.clone(),
87 )
88 .client_connect()
89 .map_err(|_| ClientConnectError)?;
90 let mut buffer = [0u8; CLIENT_BUFFER_SIZE];
91 loop {
92 match reader.read(&mut buffer).await {
93 Ok(bytes_read) if bytes_read == 0 => {
94 TcpClientActionHandle::new(
95 tcp_client_action_sender.clone(),
96 tcp_client_task_handle.clone(),
97 )
98 .client_disconnect()
99 .map_err(|_| ClientDisconnectError)?;
100 return Ok(());
101 }
102 Ok(bytes_read) => {
103 let data_as_string = String::from_utf8(Vec::from(&buffer[0..bytes_read]))
104 .map_err(|_| ClientParseError)?;
105 TcpClientActionHandle::new(
106 tcp_client_action_sender.clone(),
107 tcp_client_task_handle.clone(),
108 )
109 .client_data(data_as_string)
110 .map_err(|_| ClientDataError)?;
111 }
112 Err(_) => return Err(ClientReadError),
113 }
114 }
115 }
116
117 async fn listen_to_messages(
118 mut receiver: mpsc::Receiver<TcpClientTaskMessage>,
119 mut writer: WriteHalf<'_>,
120 ) -> Result<(), TcpClientTaskError> {
121 loop {
122 match receiver.recv().await {
123 None => return Ok(()),
124 Some(TcpClientTaskMessage::Stop) => return Ok(()),
125 Some(TcpClientTaskMessage::Send { data }) => writer
126 .write_all(data.as_bytes())
127 .await
128 .map_err(|e| ClientWriteError(data, e)),
129 }?
130 }
131 }
132}