tcp_handler/protocols/compress.rs
1//! Compression protocol. Without encryption.
2//!
3//! With compression, you can reduce the size of the data sent by the server and the client.
4//!
5//! Set the compression level by calling [`tcp_handler::config::set_config`].
6//!
7//! # Example
8//! ```rust
9//! use anyhow::Result;
10//! use bytes::{Buf, BufMut, BytesMut};
11//! use tcp_handler::protocols::compress::*;
12//! use tokio::net::{TcpListener, TcpStream};
13//! use variable_len_reader::{VariableReader, VariableWriter};
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<()> {
17//! let server = TcpListener::bind("localhost:0").await?;
18//! let mut client = TcpStream::connect(server.local_addr()?).await?;
19//! let (mut server, _) = server.accept().await?;
20//!
21//! let c_init = client_init(&mut client, "test", "0").await;
22//! let s_init = server_init(&mut server, "test", |v| v == "0").await;
23//! server_start(&mut server, "test", "0", s_init).await?;
24//! client_start(&mut client, c_init).await?;
25//!
26//! let mut writer = BytesMut::new().writer();
27//! writer.write_string("hello server.")?;
28//! let mut bytes = writer.into_inner();
29//! send(&mut client, &mut bytes).await?;
30//!
31//! let mut reader = recv(&mut server).await?.reader();
32//! let message = reader.read_string()?;
33//! assert_eq!("hello server.", message);
34//!
35//! let mut writer = BytesMut::new().writer();
36//! writer.write_string("hello client.")?;
37//! let mut bytes = writer.into_inner();
38//! send(&mut server, &mut bytes).await?;
39//!
40//! let mut reader = recv(&mut client).await?.reader();
41//! let message = reader.read_string()?;
42//! assert_eq!("hello client.", message);
43//!
44//! Ok(())
45//! }
46//! ```
47//!
48//! The send process:
49//! ```text
50//! ┌────┬────────┬────────────┐ (It may not be in contiguous memory.)
51//! in --> │ ** │ ****** │ ********** │
52//! └────┴────────┴────────────┘
53//! │
54//! │─ DeflateEncoder
55//! v
56//! ┌──────────────────┐ (Compressed bytes. In contiguous memory.)
57//! out <-- │ **************** │
58//! └──────────────────┘
59//! ```
60//! The recv process:
61//! ```text
62//! ┌──────────────────┐ (Packet data.)
63//! in --> │ **************** │
64//! └──────────────────┘
65//! │
66//! │─ DeflateDecoder
67//! v
68//! ┌────────────────────┐ (Decompressed bytes.)
69//! out <-- │ ****************** │
70//! └────────────────────┘
71//! ```
72
73use bytes::{Buf, BufMut, BytesMut};
74use flate2::write::{DeflateDecoder, DeflateEncoder};
75use tokio::io::{AsyncRead, AsyncWrite};
76use tokio::task::block_in_place;
77use crate::config::get_compression;
78use crate::protocols::common::*;
79
80/// Init the client side in tcp-handler compress protocol.
81///
82/// Must be used in conjunction with [`client_start`].
83///
84/// # Arguments
85/// * `stream` - The tcp stream or `WriteHalf`.
86/// * `identifier` - The identifier of your application.
87/// * `version` - Current version of your application.
88///
89/// # Example
90/// ```rust,no_run
91/// use anyhow::Result;
92/// use tcp_handler::protocols::compress::{client_init, client_start};
93/// use tokio::net::TcpStream;
94///
95/// #[tokio::main]
96/// async fn main() -> Result<()> {
97/// let mut client = TcpStream::connect("localhost:25564").await?;
98/// let c_init = client_init(&mut client, "test", "0").await;
99/// client_start(&mut client, c_init).await?;
100/// // Now the client is ready to use.
101/// Ok(())
102/// }
103/// ```
104#[inline]
105pub async fn client_init<W: AsyncWrite + Unpin>(stream: &mut W, identifier: &str, version: &str) -> Result<(), StarterError> {
106 write_head(stream, ProtocolVariant::Compression, identifier, version).await?;
107 flush(stream).await?;
108 Ok(())
109}
110
111/// Init the server side in tcp-handler compress protocol.
112///
113/// Must be used in conjunction with `tcp_handler::compress::server_start`.
114///
115/// # Arguments
116/// * `stream` - The tcp stream or `ReadHalf`.
117/// * `identifier` - The identifier of your application.
118/// * `version` - A prediction to determine whether the client version is allowed.
119///
120/// # Example
121/// ```rust,no_run
122/// use anyhow::Result;
123/// use tcp_handler::protocols::compress::{server_init, server_start};
124/// use tokio::net::TcpListener;
125///
126/// #[tokio::main]
127/// async fn main() -> Result<()> {
128/// let server = TcpListener::bind("localhost:25564").await?;
129/// let (mut server, _) = server.accept().await?;
130/// let s_init = server_init(&mut server, "test", |v| v == "0").await;
131/// server_start(&mut server, "test", "0", s_init).await?;
132/// // Now the server is ready to use.
133/// Ok(())
134/// }
135/// ```
136#[inline]
137pub async fn server_init<R: AsyncRead + Unpin, P: FnOnce(&str) -> bool>(stream: &mut R, identifier: &str, version: P) -> Result<(u16, String), StarterError> {
138 read_head(stream, ProtocolVariant::Compression, identifier, version).await
139}
140
141/// Make sure the client side is ready to use in tcp-handler compress protocol.
142///
143/// Must be used in conjunction with [`client_init`].
144///
145/// # Arguments
146/// * `stream` - The tcp stream or `ReadHalf`.
147/// * `last` - The return value of [`client_init`].
148///
149/// # Example
150/// ```rust,no_run
151/// use anyhow::Result;
152/// use tcp_handler::protocols::compress::{client_init, client_start};
153/// use tokio::net::TcpStream;
154///
155/// #[tokio::main]
156/// async fn main() -> Result<()> {
157/// let mut client = TcpStream::connect("localhost:25564").await?;
158/// let c_init = client_init(&mut client, "test", "0").await;
159/// client_start(&mut client, c_init).await?;
160/// // Now the client is ready to use.
161/// Ok(())
162/// }
163/// ```
164#[inline]
165pub async fn client_start<R: AsyncRead + Unpin>(stream: &mut R, last: Result<(), StarterError>) -> Result<(), StarterError> {
166 read_last(stream, last).await
167}
168
169/// Make sure the server side is ready to use in tcp-handler compress protocol.
170///
171/// Must be used in conjunction with [`server_init`].
172///
173/// # Arguments
174/// * `stream` - The tcp stream or `WriteHalf`.
175/// * `identifier` - The returned application identifier.
176/// (Should be same with the para in [`server_init`].)
177/// * `version` - The returned recommended application version.
178/// (Should be passed the prediction in [`server_init`].)
179/// * `last` - The return value of [`server_init`].
180///
181/// # Example
182/// ```rust,no_run
183/// use anyhow::Result;
184/// use tcp_handler::protocols::compress::{server_init, server_start};
185/// use tokio::net::TcpListener;
186///
187/// #[tokio::main]
188/// async fn main() -> Result<()> {
189/// let server = TcpListener::bind("localhost:25564").await?;
190/// let (mut server, _) = server.accept().await?;
191/// let s_init = server_init(&mut server, "test", |v| v == "0").await;
192/// let (protocol_version, client_version) = server_start(&mut server, "test", "0", s_init).await?;
193/// // Now the server is ready to use.
194/// # let _ = protocol_version;
195/// # let _ = client_version;
196/// Ok(())
197/// }
198/// ```
199#[inline]
200pub async fn server_start<W: AsyncWrite + Unpin>(stream: &mut W, identifier: &str, version: &str, last: Result<(u16, String), StarterError>) -> Result<(u16, String), StarterError> {
201 let res = write_last(stream, ProtocolVariant::Compression, identifier, version, last).await?;
202 flush(stream).await?;
203 Ok(res)
204}
205
206/// Send the message in tcp-handler compress protocol.
207///
208/// # Runtime
209/// Due to call [`block_in_place`] internally,
210/// this function cannot be called in a `current_thread` runtime.
211///
212/// # Arguments
213/// * `stream` - The tcp stream or `WriteHalf`.
214/// * `message` - The message to send.
215///
216/// # Example
217/// ```rust,no_run
218/// # use anyhow::Result;
219/// # use bytes::{BufMut, BytesMut};
220/// # use tcp_handler::protocols::compress::{client_init, client_start};
221/// use tcp_handler::protocols::compress::send;
222/// # use tokio::net::TcpStream;
223/// # use variable_len_reader::VariableWriter;
224///
225/// # #[tokio::main]
226/// # async fn main() -> Result<()> {
227/// # let mut client = TcpStream::connect("localhost:25564").await?;
228/// # let c_init = client_init(&mut client, "test", "0").await;
229/// # client_start(&mut client, c_init).await?;
230/// let mut buffer = BytesMut::new().writer();
231/// buffer.write_string("hello server!")?;
232/// send(&mut client, &mut buffer.into_inner()).await?;
233/// # Ok(())
234/// # }
235/// ```
236pub async fn send<W: AsyncWrite + Unpin, B: Buf>(stream: &mut W, message: &mut B) -> Result<(), PacketError> {
237 let level = get_compression();
238 let mut bytes = block_in_place(move || {
239 use variable_len_reader::VariableWritable;
240 let mut encoder = DeflateEncoder::new(BytesMut::new().writer(), level);
241 encoder.write_more_buf(message)?;
242 Ok::<_, PacketError>(encoder.finish()?.into_inner())
243 })?;
244 write_packet(stream, &mut bytes).await?;
245 flush(stream).await?;
246 Ok(())
247}
248
249/// Recv the message in tcp-handler compress protocol.
250///
251/// # Runtime
252/// Due to call [`block_in_place`] internally,
253/// this function cannot be called in a `current_thread` runtime.
254///
255/// # Arguments
256/// * `stream` - The tcp stream or `ReadHalf`.
257///
258/// # Example
259/// ```rust,no_run
260/// # use anyhow::Result;
261/// # use bytes::Buf;
262/// # use tcp_handler::protocols::compress::{server_init, server_start};
263/// use tcp_handler::protocols::compress::recv;
264/// # use tokio::net::TcpListener;
265/// # use variable_len_reader::VariableReader;
266///
267/// # #[tokio::main]
268/// # async fn main() -> Result<()> {
269/// # let server = TcpListener::bind("localhost:25564").await?;
270/// # let (mut server, _) = server.accept().await?;
271/// # let s_init = server_init(&mut server, "test", |v| v == "0").await;
272/// # server_start(&mut server, "test", "0", s_init).await?;
273/// let mut reader = recv(&mut server).await?.reader();
274/// let message = reader.read_string()?;
275/// # let _ = message;
276/// # Ok(())
277/// # }
278/// ```
279pub async fn recv<R: AsyncRead + Unpin>(stream: &mut R) -> Result<BytesMut, PacketError> {
280 let mut bytes = read_packet(stream).await?;
281 let message = block_in_place(move || {
282 use variable_len_reader::VariableWritable;
283 let mut decoder = DeflateDecoder::new(BytesMut::new().writer());
284 decoder.write_more_buf(&mut bytes)?;
285 Ok::<_, PacketError>(decoder.finish()?.into_inner())
286 })?;
287 Ok(message)
288}
289
290#[cfg(test)]
291mod tests {
292 use anyhow::Result;
293 use variable_len_reader::{VariableReader, VariableWriter};
294 use crate::protocols::common::tests::create;
295 use crate::protocols::compress::*;
296
297 #[tokio::test(flavor = "multi_thread")]
298 async fn connect() -> Result<()> {
299 let (mut client, mut server) = create().await?;
300 let c = client_init(&mut client, "a", "1").await;
301 let s = server_init(&mut server, "a", |v| v == "1").await;
302 server_start(&mut server, "a", "1", s).await?;
303 client_start(&mut client, c).await?;
304for _ in 0..10 {
305 let mut writer = BytesMut::new().writer();
306 writer.write_string("hello server in compress.")?;
307 send(&mut client, &mut writer.into_inner()).await?;
308
309 let mut reader = recv(&mut server).await?.reader();
310 let message = reader.read_string()?;
311 assert_eq!("hello server in compress.", message);
312
313 let mut writer = BytesMut::new().writer();
314 writer.write_string("hello client in compress.")?;
315 send(&mut server, &mut writer.into_inner()).await?;
316
317 let mut reader = recv(&mut client).await?.reader();
318 let message = reader.read_string()?;
319 assert_eq!("hello client in compress.", message);
320}
321 Ok(())
322 }
323}