1#![doc = include_str!("../README.md")]
2
3mod api;
4mod parser;
5mod serialiser;
6mod service;
7
8pub use crate::api::*;
9pub use crate::service::*;
10
11#[cfg(test)]
12mod tests {
13
14 use std::{
15 any::Any,
16 collections::VecDeque,
17 io::ErrorKind,
18 ops::{Deref, Range},
19 };
20
21 use anyhow::anyhow;
22 use rand::RngCore;
23 use tracing_test::traced_test;
24
25 use super::*;
26
27 #[tokio::test(flavor = "multi_thread")]
28 #[traced_test]
29 async fn test_txrx_sequential_payloads() -> Result<(), anyhow::Error> {
30 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
31 let server = TcpTpktServer::listen(test_address).await?;
32
33 let (mut client_reader, mut client_writer, mut server_reader, mut server_writer) = {
35 let client_connection = TcpTpktConnection::connect(test_address).await?;
36 let server_connection = server.accept().await?;
37 match (server_connection.get_protocol_infomation_list().get(0).ok_or_else(|| anyhow!("Test Failed"))?.deref() as &dyn Any).downcast_ref::<TcpTpktProtocolInformation>() {
38 Some(info) => assert!(info.remote_address.to_string().starts_with("127.0.0.1:")),
39 None => return Err(anyhow!("Test Failed")),
40 };
41
42 let (client_reader, client_writer) = client_connection.split().await?;
43 let (server_reader, server_writer) = server_connection.split().await?;
44 (client_reader, client_writer, server_reader, server_writer)
45 };
46
47 server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
48 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
49
50 drop(server);
51
52 for _ in 0..1000 {
53 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
54 assert_eq!(data, Vec::from(b"Hello"));
55 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
56
57 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
58 assert_eq!(data, Vec::from(b"World"));
59 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
60
61 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
62 assert_eq!(data, Vec::from(b"World"));
63 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
64
65 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
66 assert_eq!(data, Vec::from(b"Hello"));
67 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
68 }
69
70 assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
72 assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"Hello"));
73
74 drop(server_writer);
75 drop(server_reader);
76
77 match client_reader.recv().await? {
78 None => (),
79 _ => return Err(anyhow!("Failed to close connection gracefully.")),
80 };
81
82 Ok(())
83 }
84
85 #[tokio::test]
86 #[traced_test]
87 async fn test_txrx_concurrent_payloads() -> Result<(), anyhow::Error> {
88 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
89 let server = TcpTpktServer::listen(test_address).await?;
90
91 let client_connection = TcpTpktConnection::connect(test_address).await?;
92 let server_connection = server.accept().await?;
93
94 match (client_connection.get_protocol_infomation_list().get(0).ok_or_else(|| anyhow!("Test Failed"))?.deref() as &dyn Any).downcast_ref::<TcpTpktProtocolInformation>() {
95 Some(info) => assert!(info.remote_address.to_string().starts_with("127.0.0.1:")),
96 None => return Err(anyhow!("Test Failed")),
97 };
98 match (server_connection.get_protocol_infomation_list().get(0).ok_or_else(|| anyhow!("Test Failed"))?.deref() as &dyn Any).downcast_ref::<TcpTpktProtocolInformation>() {
99 Some(info) => assert!(info.remote_address.to_string().starts_with("127.0.0.1:")),
100 None => return Err(anyhow!("Test Failed")),
101 };
102
103 let (mut client_reader, mut client_writer) = client_connection.split().await?;
104 let (mut server_reader, mut server_writer) = server_connection.split().await?;
105
106 server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
107 server_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
108
109 drop(server);
110
111 for _ in 0..1000 {
112 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
113 assert_eq!(data, Vec::from(b"Hello"));
114 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
115
116 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
117 assert_eq!(data, Vec::from(b"World"));
118 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
119
120 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
121 assert_eq!(data, Vec::from(b"Hello"));
122 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
123
124 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
125 assert_eq!(data, Vec::from(b"World"));
126 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
127 }
128
129 assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"Hello"));
130 assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
131
132 drop(client_writer);
133 drop(client_reader);
134
135 match server_reader.recv().await? {
137 None => (),
138 _ => return Err(anyhow!("Failed to close connection gracefully.")),
139 };
140
141 Ok(())
142 }
143
144 #[tokio::test]
145 #[traced_test]
146 async fn test_txrx_sequential_ungraceful_shutdown() -> Result<(), anyhow::Error> {
147 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
148 let server = TcpTpktServer::listen(test_address).await?;
149
150 let client_connection = TcpTpktConnection::connect(test_address).await?;
151 let server_connection = server.accept().await?;
152
153 let (mut client_reader, mut client_writer) = client_connection.split().await?;
154 let (mut server_reader, mut server_writer) = server_connection.split().await?;
155
156 server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
157 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
158
159 drop(server);
160
161 for _ in 0..1000 {
162 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
163 assert_eq!(data, Vec::from(b"World"));
164 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
165
166 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
167 assert_eq!(data, Vec::from(b"Hello"));
168 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
169
170 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
171 assert_eq!(data, Vec::from(b"Hello"));
172 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
173
174 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
175 assert_eq!(data, Vec::from(b"World"));
176 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
177 }
178
179 assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
181 assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"Hello"));
182
183 drop(server_writer);
184 drop(server_reader);
185
186 match client_reader.recv().await? {
187 None => (),
188 _ => return Err(anyhow!("Failed to close connection gracefully.")),
189 };
190
191 Ok(())
192 }
193
194 #[tokio::test]
195 #[traced_test]
196 async fn test_txrx_zero_byte_data() -> Result<(), anyhow::Error> {
197 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
198 let server = TcpTpktServer::listen(test_address).await?;
199
200 let client_connection = TcpTpktConnection::connect(test_address).await?;
201 let server_connection = server.accept().await?;
202
203 drop(server);
204
205 let (mut client_reader, mut client_writer) = client_connection.split().await?;
206 let (mut server_reader, mut server_writer) = server_connection.split().await?;
207
208 server_writer.send(&mut VecDeque::from_iter(vec![b"".to_vec()])).await?;
209 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
210
211 for _ in 0..1000 {
212 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
213 assert_eq!(data, Vec::from(b"World"));
214 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
215
216 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
217 assert_eq!(data, Vec::from(b""));
218 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
219
220 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
221 assert_eq!(data, Vec::from(b""));
222 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
223
224 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
225 assert_eq!(data, Vec::from(b"World"));
226 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
227 }
228
229 assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
231 assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b""));
232
233 drop(server_writer);
234 drop(server_reader);
235
236 match client_reader.recv().await? {
237 None => (),
238 _ => return Err(anyhow!("Failed to close connection gracefully.")),
239 };
240
241 Ok(())
242 }
243
244 #[tokio::test]
245 #[traced_test]
246 async fn test_txrx_max_byte_data() -> Result<(), anyhow::Error> {
247 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
248 let server = TcpTpktServer::listen(test_address).await?;
249
250 let client_connection = TcpTpktConnection::connect(test_address).await?;
251 let server_connection = server.accept().await?;
252
253 drop(server);
254
255 let mut buffer = [0u8; 65531];
256 rand::rng().fill_bytes(&mut buffer[..]);
257
258 let (mut client_reader, mut client_writer) = client_connection.split().await?;
259 let (mut server_reader, mut server_writer) = server_connection.split().await?;
260
261 server_writer.send(&mut VecDeque::from_iter(vec![buffer.to_vec()])).await?;
262 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
263
264 for _ in 0..1000 {
265 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
266 assert_eq!(data, Vec::from(b"World"));
267 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
268
269 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
270 assert_eq!(data, Vec::from(buffer));
271 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
272
273 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
274 assert_eq!(data, Vec::from(buffer));
275 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
276
277 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
278 assert_eq!(data, Vec::from(b"World"));
279 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
280 }
281
282 assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
284 assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(buffer));
285
286 drop(server_writer);
287 drop(server_reader);
288
289 match client_reader.recv().await? {
290 None => (),
291 _ => return Err(anyhow!("Failed to close connection gracefully.")),
292 };
293
294 Ok(())
295 }
296
297 #[tokio::test]
298 #[traced_test]
299 async fn test_txrx_over_max_byte_data() -> Result<(), anyhow::Error> {
300 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
301 let server = TcpTpktServer::listen(test_address).await?;
302
303 let client_connection = TcpTpktConnection::connect(test_address).await?;
304 let server_connection = server.accept().await?;
305
306 drop(server);
307
308 let mut over_buffer = [0u8; 65532];
309 rand::rng().fill_bytes(&mut over_buffer[..]);
310
311 let (mut client_reader, mut client_writer) = client_connection.split().await?;
312 let (mut server_reader, mut server_writer) = server_connection.split().await?;
313
314 match server_writer.send(&mut VecDeque::from_iter(vec![over_buffer.to_vec()])).await {
315 Ok(_) => assert!(false, "This was expected to fail as it is over the max payload limit"),
316 Err(TpktError::ProtocolError(x)) => assert_eq!(x, "TPKT user data must be less than or equal to 65531 but was 65532"),
317 _ => return Err(anyhow!("Something unexpected happened")),
318 };
319
320 let mut buffer = [0u8; 65531];
322 rand::rng().fill_bytes(&mut buffer[..]);
323 server_writer.send(&mut VecDeque::from_iter(vec![buffer.to_vec()])).await?;
324 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
325
326 for _ in 0..100 {
327 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
328 assert_eq!(data, Vec::from(b"World"));
329 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
330
331 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
332 assert_eq!(data, Vec::from(buffer));
333 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
334
335 let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
336 assert_eq!(data, Vec::from(buffer));
337 server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
338
339 let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
340 assert_eq!(data, Vec::from(b"World"));
341 client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
342 }
343
344 assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
346 assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(buffer));
347
348 drop(server_writer);
349 drop(server_reader);
350
351 match client_reader.recv().await? {
352 None => (),
353 _ => return Err(anyhow!("Failed to close connection gracefully.")),
354 };
355
356 Ok(())
357 }
358
359 #[tokio::test]
360 #[traced_test]
361 async fn test_no_open_socket() -> Result<(), anyhow::Error> {
362 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
363
364 match TcpTpktConnection::connect(test_address).await {
365 Ok(_) => assert!(false, "This was expected to fail as a socket was not opened."),
366 Err(TpktError::IoError(x)) => assert_eq!(x.kind(), ErrorKind::ConnectionRefused),
367 Err(x) => return Err(anyhow!("Something unexpected happened: {:?}", x)),
368 };
369
370 Ok(())
371 }
372}