1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::io;

use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::task::JoinHandle;

use crate::io::tcp::client_handler::TcpClientTaskError::{
    ClientConnectError, ClientDataError, ClientDisconnectError, ClientParseError, ClientReadError,
    ClientWriteError,
};
use crate::io::tcp::server::BUFFER_SIZE;
use crate::io::tcp::tcp_client_action::{TcpClientActionHandle, TcpClientActionSender};

const CLIENT_BUFFER_SIZE: usize = 1024;

#[derive(Debug)]
pub enum TcpClientTaskMessage {
    Stop,
    Send { data: String },
}

#[derive(Debug, Clone)]
pub struct TcpClientTaskHandle {
    sender: mpsc::Sender<TcpClientTaskMessage>,
}

impl TcpClientTaskHandle {
    pub fn new(sender: mpsc::Sender<TcpClientTaskMessage>) -> Self {
        TcpClientTaskHandle { sender }
    }

    pub async fn send_data(&self, data: String) -> Result<(), TrySendError<TcpClientTaskMessage>> {
        self.sender.try_send(TcpClientTaskMessage::Send { data })
    }

    pub async fn stop(&self) -> Result<(), TrySendError<TcpClientTaskMessage>> {
        self.sender.try_send(TcpClientTaskMessage::Stop)
    }
}

pub(crate) struct TcpClientTask {}

#[derive(Debug, Error)]
pub enum TcpClientTaskError {
    #[error("Error while writing the data {0} to the client, due to: {1}")]
    ClientWriteError(String, io::Error),
    #[error("Error while trying to parse content from the client")]
    ClientParseError,
    #[error("Error while trying to read data from the client")]
    ClientReadError,
    #[error("Error while trying to send client connect action")]
    ClientConnectError,
    #[error("Error while trying to send client disconnect action")]
    ClientDisconnectError,
    #[error("Error while trying to send client send data action")]
    ClientDataError,
}

impl TcpClientTask {
    pub fn handle_client(
        mut client_stream: TcpStream,
        tcp_client_action_sender: TcpClientActionSender,
    ) -> JoinHandle<Result<(), TcpClientTaskError>> {
        let (tcp_client_task_sender, tcp_client_task_receiver) = mpsc::channel(BUFFER_SIZE);
        tokio::spawn(async move {
            let (reader, writer) = client_stream.split();
            select! {
                res = Self::listen_to_messages(tcp_client_task_receiver, writer) => {res?}
                res = Self::listen_to_client(reader, tcp_client_action_sender, TcpClientTaskHandle::new(tcp_client_task_sender)) => {res?}
            }
            Ok::<(), TcpClientTaskError>(())
        })
    }

    async fn listen_to_client(
        mut reader: ReadHalf<'_>,
        tcp_client_action_sender: TcpClientActionSender,
        tcp_client_task_handle: TcpClientTaskHandle,
    ) -> Result<(), TcpClientTaskError> {
        TcpClientActionHandle::new(
            tcp_client_action_sender.clone(),
            tcp_client_task_handle.clone(),
        )
        .client_connect()
        .map_err(|_| ClientConnectError)?;
        let mut buffer = [0u8; CLIENT_BUFFER_SIZE];
        loop {
            match reader.read(&mut buffer).await {
                Ok(bytes_read) if bytes_read == 0 => {
                    TcpClientActionHandle::new(
                        tcp_client_action_sender.clone(),
                        tcp_client_task_handle.clone(),
                    )
                    .client_disconnect()
                    .map_err(|_| ClientDisconnectError)?;
                    return Ok(());
                }
                Ok(bytes_read) => {
                    let data_as_string = String::from_utf8(Vec::from(&buffer[0..bytes_read]))
                        .map_err(|_| ClientParseError)?;
                    TcpClientActionHandle::new(
                        tcp_client_action_sender.clone(),
                        tcp_client_task_handle.clone(),
                    )
                    .client_data(data_as_string)
                    .map_err(|_| ClientDataError)?;
                }
                Err(_) => return Err(ClientReadError),
            }
        }
    }

    async fn listen_to_messages(
        mut receiver: mpsc::Receiver<TcpClientTaskMessage>,
        mut writer: WriteHalf<'_>,
    ) -> Result<(), TcpClientTaskError> {
        loop {
            match receiver.recv().await {
                None => return Ok(()),
                Some(TcpClientTaskMessage::Stop) => return Ok(()),
                Some(TcpClientTaskMessage::Send { data }) => writer
                    .write_all(data.as_bytes())
                    .await
                    .map_err(|e| ClientWriteError(data, e)),
            }?
        }
    }
}