tcp_handler/protocols/
compress.rs

1//! Compression protocol. Without encryption.
2//!
3//! With compression, you can reduce the size of the data sent by the server and the client.
4//!
5//! Set the compression level by calling [`tcp_handler::config::set_config`].
6//!
7//! # Example
8//! ```rust
9//! use anyhow::Result;
10//! use bytes::{Buf, BufMut, BytesMut};
11//! use tcp_handler::protocols::compress::*;
12//! use tokio::net::{TcpListener, TcpStream};
13//! use variable_len_reader::{VariableReader, VariableWriter};
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<()> {
17//!     let server = TcpListener::bind("localhost:0").await?;
18//!     let mut client = TcpStream::connect(server.local_addr()?).await?;
19//!     let (mut server, _) = server.accept().await?;
20//!
21//!     let c_init = client_init(&mut client, "test", "0").await;
22//!     let s_init = server_init(&mut server, "test", |v| v == "0").await;
23//!     server_start(&mut server, "test", "0", s_init).await?;
24//!     client_start(&mut client, c_init).await?;
25//!
26//!     let mut writer = BytesMut::new().writer();
27//!     writer.write_string("hello server.")?;
28//!     let mut bytes = writer.into_inner();
29//!     send(&mut client, &mut bytes).await?;
30//!
31//!     let mut reader = recv(&mut server).await?.reader();
32//!     let message = reader.read_string()?;
33//!     assert_eq!("hello server.", message);
34//!
35//!     let mut writer = BytesMut::new().writer();
36//!     writer.write_string("hello client.")?;
37//!     let mut bytes = writer.into_inner();
38//!     send(&mut server, &mut bytes).await?;
39//!
40//!     let mut reader = recv(&mut client).await?.reader();
41//!     let message = reader.read_string()?;
42//!     assert_eq!("hello client.", message);
43//!
44//!     Ok(())
45//! }
46//! ```
47//!
48//! The send process:
49//! ```text
50//!         ┌────┬────────┬────────────┐ (It may not be in contiguous memory.)
51//! in  --> │ ** │ ****** │ ********** │
52//!         └────┴────────┴────────────┘
53//!           │
54//!           │─ DeflateEncoder
55//!           v
56//!         ┌──────────────────┐ (Compressed bytes. In contiguous memory.)
57//! out <-- │ **************** │
58//!         └──────────────────┘
59//! ```
60//! The recv process:
61//! ```text
62//!         ┌──────────────────┐ (Packet data.)
63//! in  --> │ **************** │
64//!         └──────────────────┘
65//!           │
66//!           │─ DeflateDecoder
67//!           v
68//!         ┌────────────────────┐ (Decompressed bytes.)
69//! out <-- │ ****************** │
70//!         └────────────────────┘
71//! ```
72
73use bytes::{Buf, BufMut, BytesMut};
74use flate2::write::{DeflateDecoder, DeflateEncoder};
75use tokio::io::{AsyncRead, AsyncWrite};
76use tokio::task::block_in_place;
77use crate::config::get_compression;
78use crate::protocols::common::*;
79
80/// Init the client side in tcp-handler compress protocol.
81///
82/// Must be used in conjunction with [`client_start`].
83///
84/// # Arguments
85///  * `stream` - The tcp stream or `WriteHalf`.
86///  * `identifier` - The identifier of your application.
87///  * `version` - Current version of your application.
88///
89/// # Example
90/// ```rust,no_run
91/// use anyhow::Result;
92/// use tcp_handler::protocols::compress::{client_init, client_start};
93/// use tokio::net::TcpStream;
94///
95/// #[tokio::main]
96/// async fn main() -> Result<()> {
97///     let mut client = TcpStream::connect("localhost:25564").await?;
98///     let c_init = client_init(&mut client, "test", "0").await;
99///     client_start(&mut client, c_init).await?;
100///     // Now the client is ready to use.
101///     Ok(())
102/// }
103/// ```
104#[inline]
105pub async fn client_init<W: AsyncWrite + Unpin>(stream: &mut W, identifier: &str, version: &str) -> Result<(), StarterError> {
106    write_head(stream, ProtocolVariant::Compression, identifier, version).await?;
107    flush(stream).await?;
108    Ok(())
109}
110
111/// Init the server side in tcp-handler compress protocol.
112///
113/// Must be used in conjunction with `tcp_handler::compress::server_start`.
114///
115/// # Arguments
116///  * `stream` - The tcp stream or `ReadHalf`.
117///  * `identifier` - The identifier of your application.
118///  * `version` - A prediction to determine whether the client version is allowed.
119///
120/// # Example
121/// ```rust,no_run
122/// use anyhow::Result;
123/// use tcp_handler::protocols::compress::{server_init, server_start};
124/// use tokio::net::TcpListener;
125///
126/// #[tokio::main]
127/// async fn main() -> Result<()> {
128///     let server = TcpListener::bind("localhost:25564").await?;
129///     let (mut server, _) = server.accept().await?;
130///     let s_init = server_init(&mut server, "test", |v| v == "0").await;
131///     server_start(&mut server, "test", "0", s_init).await?;
132///     // Now the server is ready to use.
133///     Ok(())
134/// }
135/// ```
136#[inline]
137pub async fn server_init<R: AsyncRead + Unpin, P: FnOnce(&str) -> bool>(stream: &mut R, identifier: &str, version: P) -> Result<(u16, String), StarterError> {
138    read_head(stream, ProtocolVariant::Compression, identifier, version).await
139}
140
141/// Make sure the client side is ready to use in tcp-handler compress protocol.
142///
143/// Must be used in conjunction with [`client_init`].
144///
145/// # Arguments
146///  * `stream` - The tcp stream or `ReadHalf`.
147///  * `last` - The return value of [`client_init`].
148///
149/// # Example
150/// ```rust,no_run
151/// use anyhow::Result;
152/// use tcp_handler::protocols::compress::{client_init, client_start};
153/// use tokio::net::TcpStream;
154///
155/// #[tokio::main]
156/// async fn main() -> Result<()> {
157///     let mut client = TcpStream::connect("localhost:25564").await?;
158///     let c_init = client_init(&mut client, "test", "0").await;
159///     client_start(&mut client, c_init).await?;
160///     // Now the client is ready to use.
161///     Ok(())
162/// }
163/// ```
164#[inline]
165pub async fn client_start<R: AsyncRead + Unpin>(stream: &mut R, last: Result<(), StarterError>) -> Result<(), StarterError> {
166    read_last(stream, last).await
167}
168
169/// Make sure the server side is ready to use in tcp-handler compress protocol.
170///
171/// Must be used in conjunction with [`server_init`].
172///
173/// # Arguments
174///  * `stream` - The tcp stream or `WriteHalf`.
175///  * `identifier` - The returned application identifier.
176/// (Should be same with the para in [`server_init`].)
177///  * `version` - The returned recommended application version.
178/// (Should be passed the prediction in [`server_init`].)
179///  * `last` - The return value of [`server_init`].
180///
181/// # Example
182/// ```rust,no_run
183/// use anyhow::Result;
184/// use tcp_handler::protocols::compress::{server_init, server_start};
185/// use tokio::net::TcpListener;
186///
187/// #[tokio::main]
188/// async fn main() -> Result<()> {
189///     let server = TcpListener::bind("localhost:25564").await?;
190///     let (mut server, _) = server.accept().await?;
191///     let s_init = server_init(&mut server, "test", |v| v == "0").await;
192///     let (protocol_version, client_version) = server_start(&mut server, "test", "0", s_init).await?;
193///     // Now the server is ready to use.
194///     # let _ = protocol_version;
195///     # let _ = client_version;
196///     Ok(())
197/// }
198/// ```
199#[inline]
200pub async fn server_start<W: AsyncWrite + Unpin>(stream: &mut W, identifier: &str, version: &str, last: Result<(u16, String), StarterError>) -> Result<(u16, String), StarterError> {
201    let res = write_last(stream, ProtocolVariant::Compression, identifier, version, last).await?;
202    flush(stream).await?;
203    Ok(res)
204}
205
206/// Send the message in tcp-handler compress protocol.
207///
208/// # Runtime
209/// Due to call [`block_in_place`] internally,
210/// this function cannot be called in a `current_thread` runtime.
211///
212/// # Arguments
213///  * `stream` - The tcp stream or `WriteHalf`.
214///  * `message` - The message to send.
215///
216/// # Example
217/// ```rust,no_run
218/// # use anyhow::Result;
219/// # use bytes::{BufMut, BytesMut};
220/// # use tcp_handler::protocols::compress::{client_init, client_start};
221/// use tcp_handler::protocols::compress::send;
222/// # use tokio::net::TcpStream;
223/// # use variable_len_reader::VariableWriter;
224///
225/// # #[tokio::main]
226/// # async fn main() -> Result<()> {
227/// #     let mut client = TcpStream::connect("localhost:25564").await?;
228/// #     let c_init = client_init(&mut client, "test", "0").await;
229/// #     client_start(&mut client, c_init).await?;
230/// let mut buffer = BytesMut::new().writer();
231/// buffer.write_string("hello server!")?;
232/// send(&mut client, &mut buffer.into_inner()).await?;
233/// #     Ok(())
234/// # }
235/// ```
236pub async fn send<W: AsyncWrite + Unpin, B: Buf>(stream: &mut W, message: &mut B) -> Result<(), PacketError> {
237    let level = get_compression();
238    let mut bytes = block_in_place(move || {
239        use variable_len_reader::VariableWritable;
240        let mut encoder = DeflateEncoder::new(BytesMut::new().writer(), level);
241        encoder.write_more_buf(message)?;
242        Ok::<_, PacketError>(encoder.finish()?.into_inner())
243    })?;
244    write_packet(stream, &mut bytes).await?;
245    flush(stream).await?;
246    Ok(())
247}
248
249/// Recv the message in tcp-handler compress protocol.
250///
251/// # Runtime
252/// Due to call [`block_in_place`] internally,
253/// this function cannot be called in a `current_thread` runtime.
254///
255/// # Arguments
256///  * `stream` - The tcp stream or `ReadHalf`.
257///
258/// # Example
259/// ```rust,no_run
260/// # use anyhow::Result;
261/// # use bytes::Buf;
262/// # use tcp_handler::protocols::compress::{server_init, server_start};
263/// use tcp_handler::protocols::compress::recv;
264/// # use tokio::net::TcpListener;
265/// # use variable_len_reader::VariableReader;
266///
267/// # #[tokio::main]
268/// # async fn main() -> Result<()> {
269/// #     let server = TcpListener::bind("localhost:25564").await?;
270/// #     let (mut server, _) = server.accept().await?;
271/// #     let s_init = server_init(&mut server, "test", |v| v == "0").await;
272/// #     server_start(&mut server, "test", "0", s_init).await?;
273/// let mut reader = recv(&mut server).await?.reader();
274/// let message = reader.read_string()?;
275/// #     let _ = message;
276/// #     Ok(())
277/// # }
278/// ```
279pub async fn recv<R: AsyncRead + Unpin>(stream: &mut R) -> Result<BytesMut, PacketError> {
280    let mut bytes = read_packet(stream).await?;
281    let message = block_in_place(move || {
282        use variable_len_reader::VariableWritable;
283        let mut decoder = DeflateDecoder::new(BytesMut::new().writer());
284        decoder.write_more_buf(&mut bytes)?;
285        Ok::<_, PacketError>(decoder.finish()?.into_inner())
286    })?;
287    Ok(message)
288}
289
290#[cfg(test)]
291mod tests {
292    use anyhow::Result;
293    use variable_len_reader::{VariableReader, VariableWriter};
294    use crate::protocols::common::tests::create;
295    use crate::protocols::compress::*;
296
297    #[tokio::test(flavor = "multi_thread")]
298    async fn connect() -> Result<()> {
299        let (mut client, mut server) = create().await?;
300        let c = client_init(&mut client, "a", "1").await;
301        let s = server_init(&mut server, "a", |v| v == "1").await;
302        server_start(&mut server, "a", "1", s).await?;
303        client_start(&mut client, c).await?;
304for _ in 0..10 {
305        let mut writer = BytesMut::new().writer();
306        writer.write_string("hello server in compress.")?;
307        send(&mut client, &mut writer.into_inner()).await?;
308
309        let mut reader = recv(&mut server).await?.reader();
310        let message = reader.read_string()?;
311        assert_eq!("hello server in compress.", message);
312
313        let mut writer = BytesMut::new().writer();
314        writer.write_string("hello client in compress.")?;
315        send(&mut server, &mut writer.into_inner()).await?;
316
317        let mut reader = recv(&mut client).await?.reader();
318        let message = reader.read_string()?;
319        assert_eq!("hello client in compress.", message);
320}
321        Ok(())
322    }
323}