tcp_handler/streams/
compress_encrypt.rs

1//! `TcpHandler` warps the [`crate::protocols::compress_encrypt`] protocol.
2
3use bytes::{Buf, BytesMut};
4use tokio::io::{AsyncRead, AsyncWrite};
5use crate::protocols::common::{Cipher, PacketError, StarterError};
6use crate::protocols::compress_encrypt::{self, send, recv};
7use crate::streams::impl_tcp_handler;
8
9/// The server side `TcpHandler` of the `compress_encrypt` protocol.
10#[derive(Debug)]
11pub struct TcpServerHandlerCompressEncrypt<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
12    reader: R,
13    writer: W,
14    cipher: Cipher,
15    version: String,
16}
17
18impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> TcpServerHandlerCompressEncrypt<R, W> {
19    /// Create and init a new `TcpServerHandlerCompressEncrypt`.
20    pub async fn new<P: FnOnce(&str) -> bool>(mut reader: R, mut writer: W, identifier: &str, version_prediction: P, version: &str) -> Result<Self, StarterError> {
21        let init = compress_encrypt::server_init(&mut reader, identifier, version_prediction).await;
22        let (cipher, _protocol_version, application_version) = compress_encrypt::server_start(&mut writer, identifier, version, init).await?;
23        Ok(Self { reader, writer, cipher, version: application_version })
24    }
25
26    /// Deconstruct the `TcpServerHandlerCompressEncrypt` into the inner parts.
27    #[inline]
28    pub fn into_inner(self) -> (R, W, Cipher) {
29        (self.reader, self.writer, self.cipher)
30    }
31
32    /// **Unsafe!!!**
33    /// Construct the `TcpServerHandlerCompressEncrypt` from the inner parts.
34    #[inline]
35    pub fn from_inner(reader: R, writer: W, cipher: Cipher, version: String) -> Self {
36        Self { reader, writer, cipher, version }
37    }
38
39    /// Send a message to the client.
40    #[inline]
41    pub async fn send<B: Buf>(&mut self, message: &mut B) -> Result<(), PacketError> {
42        send(&mut self.writer, message, &self.cipher).await
43    }
44
45    /// Receive a message from the client.
46    #[inline]
47    pub async fn recv(&mut self) -> Result<BytesMut, PacketError> {
48        recv(&mut self.reader, &self.cipher).await
49    }
50}
51
52impl_tcp_handler!(server TcpServerHandlerCompressEncrypt);
53
54
55/// The client side `TcpHandler` of the `compress_encrypt` protocol.
56#[derive(Debug)]
57pub struct TcpClientHandlerCompressEncrypt<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
58    reader: R,
59    writer: W,
60    cipher: Cipher,
61}
62
63impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> TcpClientHandlerCompressEncrypt<R, W> {
64    /// Create and init a new `TcpClientHandlerCompressEncrypt`.
65    pub async fn new(mut reader: R, mut writer: W, identifier: &str, version: &str) -> Result<Self, StarterError> {
66        let init = compress_encrypt::client_init(&mut writer, identifier, version).await;
67        let cipher = compress_encrypt::client_start(&mut reader, init).await?;
68        Ok(Self { reader, writer, cipher })
69    }
70
71    /// Deconstruct the `TcpClientHandlerCompressEncrypt` into the inner parts.
72    #[inline]
73    pub fn into_inner(self) -> (R, W, Cipher) {
74        (self.reader, self.writer, self.cipher)
75    }
76
77    /// **Unsafe!!!**
78    /// Construct the `TcpClientHandlerCompressEncrypt` from the inner parts.
79    #[inline]
80    pub fn from_inner(reader: R, writer: W, cipher: Cipher) -> Self {
81        Self { reader, writer, cipher }
82    }
83
84    /// Send a message to the server.
85    #[inline]
86    pub async fn send<B: Buf>(&mut self, message: &mut B) -> Result<(), PacketError> {
87        send(&mut self.writer, message, &self.cipher).await
88    }
89
90    /// Receive a message from the server.
91    #[inline]
92    pub async fn recv(&mut self) -> Result<BytesMut, PacketError> {
93        recv(&mut self.reader, &self.cipher).await
94    }
95}
96
97impl_tcp_handler!(client TcpClientHandlerCompressEncrypt);
98
99
100#[cfg(test)]
101mod tests {
102    use anyhow::Result;
103    use bytes::{Buf, BufMut};
104    use tokio::{spawn, try_join};
105    use variable_len_reader::{VariableReader, VariableWriter};
106    use crate::streams::compress_encrypt::{TcpClientHandlerCompressEncrypt, TcpServerHandlerCompressEncrypt};
107    use crate::streams::tests::{check_send_recv, create};
108
109    #[tokio::test(flavor = "multi_thread")]
110    async fn connect() -> Result<()> {
111        let (cr, cw, sr, sw) = create().await?;
112        let server = spawn(TcpServerHandlerCompressEncrypt::new(sr, sw, "test", |v| v == "0", "0"));
113        let client = spawn(TcpClientHandlerCompressEncrypt::new(cr, cw, "test", "0"));
114        let (server, client) = try_join!(server, client)?;
115        let (mut server, mut client) = (server?, client?);
116
117        assert_eq!("0", server.get_client_version());
118        check_send_recv!(server, client, "Hello tcp-handler encrypt.");
119        check_send_recv!(client, server, "Hello tcp-handler encrypt.");
120        Ok(())
121    }
122}