1#![doc = include_str!("../README.md")]
2
3mod api;
4mod parser;
5mod serialiser;
6mod service;
7
8pub use crate::api::*;
9pub use crate::service::*;
10
11#[cfg(test)]
12mod tests {
13
14 use std::{collections::VecDeque, io::ErrorKind, ops::Range};
15
16 use rand::RngCore;
17 use tracing_test::traced_test;
18
19 use super::*;
20
21 #[tokio::test(flavor = "multi_thread")]
22 #[traced_test]
23 async fn test_txrx_sequential_payloads() -> Result<(), anyhow::Error> {
24 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
25 let server = TcpTpktServer::listen(test_address).await?;
26
27 let (mut client_reader, mut client_writer, mut server_reader, mut server_writer) = {
29 let client_connection = TcpTpktConnection::connect(test_address).await?;
30 let (server_connection, remote_host) = server.accept().await?;
31 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
32
33 let (client_reader, client_writer) = client_connection.split().await?;
34 let (server_reader, server_writer) = server_connection.split().await?;
35 (client_reader, client_writer, server_reader, server_writer)
36 };
37
38 server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
39 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
40
41 drop(server);
42
43 for _ in 0..1000 {
44 match server_reader.recv().await? {
45 TpktRecvResult::Data(x) => {
46 assert_eq!(x, Vec::from(b"World"));
47 server_writer.send(&mut VecDeque::from(vec![x.to_vec()])).await?;
48 }
49 _ => assert!(false, "Connection was unexpectedly closed"),
50 }
51 match client_reader.recv().await? {
52 TpktRecvResult::Data(x) => {
53 assert_eq!(x, Vec::from(b"Hello"));
54 client_writer.send(&mut VecDeque::from(vec![x.to_vec()])).await?;
55 }
56 _ => assert!(false, "Connection was unexpectedly closed"),
57 }
58 match server_reader.recv().await? {
59 TpktRecvResult::Data(x) => {
60 assert_eq!(x, Vec::from(b"Hello"));
61 server_writer.send(&mut VecDeque::from(vec![x.to_vec()])).await?;
62 }
63 _ => assert!(false, "Connection was unexpectedly closed"),
64 }
65 match client_reader.recv().await? {
66 TpktRecvResult::Data(x) => {
67 assert_eq!(x, Vec::from(b"World"));
68 client_writer.send(&mut VecDeque::from(vec![x.to_vec()])).await?;
69 }
70 _ => assert!(false, "Connection was unexpectedly closed"),
71 }
72 }
73
74 match server_reader.recv().await? {
76 TpktRecvResult::Data(x) => {
77 assert_eq!(x, Vec::from(b"World"));
78 }
79 _ => assert!(false, "Connection was unexpectedly closed"),
80 }
81 match client_reader.recv().await? {
82 TpktRecvResult::Data(x) => {
83 assert_eq!(x, Vec::from(b"Hello"));
84 }
85 _ => assert!(false, "Connection was unexpectedly closed"),
86 }
87
88 drop(server_writer);
89 drop(server_reader);
90
91 match client_reader.recv().await? {
92 TpktRecvResult::Closed => (),
93 _ => assert!(false, "Failed to close connection gracefully."),
94 };
95
96 Ok(())
97 }
98
99 #[tokio::test]
100 #[traced_test]
101 async fn test_txrx_concurrent_payloads() -> Result<(), anyhow::Error> {
102 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
103 let server = TcpTpktServer::listen(test_address).await?;
104
105 let client_connection = TcpTpktConnection::connect(test_address).await?;
106 let (server_connection, remote_host) = server.accept().await?;
107 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
108
109 let (mut client_reader, mut client_writer) = client_connection.split().await?;
110 let (mut server_reader, mut server_writer) = server_connection.split().await?;
111
112 server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
113 server_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
114
115 drop(server);
116
117 for _ in 0..1000 {
118 match client_reader.recv().await? {
119 TpktRecvResult::Data(x) => {
120 assert_eq!(x, Vec::from(b"Hello"));
121 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
122 }
123 _ => panic!("Connection was unexpectedly closed"),
124 }
125 match client_reader.recv().await? {
126 TpktRecvResult::Data(x) => {
127 assert_eq!(x, Vec::from(b"World"));
128 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
129 }
130 _ => panic!("Connection was unexpectedly closed"),
131 }
132 match server_reader.recv().await? {
133 TpktRecvResult::Data(x) => {
134 assert_eq!(x, Vec::from(b"Hello"));
135 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
136 }
137 _ => panic!("Connection was unexpectedly closed"),
138 }
139 match server_reader.recv().await? {
140 TpktRecvResult::Data(x) => {
141 assert_eq!(x, Vec::from(b"World"));
142 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
143 }
144 _ => panic!("Connection was unexpectedly closed"),
145 }
146 }
147
148 match client_reader.recv().await? {
149 TpktRecvResult::Data(x) => {
150 assert_eq!(x, Vec::from(b"Hello"));
151 }
152 _ => panic!("Connection was unexpectedly closed"),
153 }
154 match client_reader.recv().await? {
155 TpktRecvResult::Data(x) => {
156 assert_eq!(x, Vec::from(b"World"));
157 }
158 _ => panic!("Connection was unexpectedly closed"),
159 }
160
161 drop(client_writer);
162 drop(client_reader);
163
164 match server_reader.recv().await? {
166 TpktRecvResult::Closed => (),
167 _ => assert!(false, "Failed to close connection gracefully."),
168 };
169
170 Ok(())
171 }
172
173 #[tokio::test]
174 #[traced_test]
175 async fn test_txrx_sequential_ungraceful_shutdown() -> Result<(), anyhow::Error> {
176 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
177 let server = TcpTpktServer::listen(test_address).await?;
178
179 let client_connection = TcpTpktConnection::connect(test_address).await?;
180 let (server_connection, remote_host) = server.accept().await?;
181 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
182
183 let (mut client_reader, mut client_writer) = client_connection.split().await?;
184 let (mut server_reader, mut server_writer) = server_connection.split().await?;
185
186 server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
187 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
188
189 drop(server);
190
191 for _ in 0..1000 {
192 match server_reader.recv().await? {
193 TpktRecvResult::Data(x) => {
194 assert_eq!(x, Vec::from(b"World"));
195 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
196 }
197 _ => assert!(false, "Connection was unexpectedly closed"),
198 }
199 match client_reader.recv().await? {
200 TpktRecvResult::Data(x) => {
201 assert_eq!(x, Vec::from(b"Hello"));
202 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
203 }
204 _ => assert!(false, "Connection was unexpectedly closed"),
205 }
206 match server_reader.recv().await? {
207 TpktRecvResult::Data(x) => {
208 assert_eq!(x, Vec::from(b"Hello"));
209 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
210 }
211 _ => assert!(false, "Connection was unexpectedly closed"),
212 }
213 match client_reader.recv().await? {
214 TpktRecvResult::Data(x) => {
215 assert_eq!(x, Vec::from(b"World"));
216 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
217 }
218 _ => assert!(false, "Connection was unexpectedly closed"),
219 }
220 }
221
222 match server_reader.recv().await? {
224 TpktRecvResult::Data(x) => {
225 assert_eq!(x, Vec::from(b"World"));
226 }
227 _ => assert!(false, "Connection was unexpectedly closed"),
228 }
229 match client_reader.recv().await? {
230 TpktRecvResult::Data(x) => {
231 assert_eq!(x, Vec::from(b"Hello"));
232 }
233 _ => assert!(false, "Connection was unexpectedly closed"),
234 }
235
236 drop(server_writer);
237 drop(server_reader);
238
239 match client_reader.recv().await? {
240 TpktRecvResult::Closed => (),
241 _ => assert!(false, "Failed to close connection gracefully."),
242 };
243
244 Ok(())
245 }
246
247 #[tokio::test]
248 #[traced_test]
249 async fn test_txrx_zero_byte_data() -> Result<(), anyhow::Error> {
250 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
251 let server = TcpTpktServer::listen(test_address).await?;
252
253 let client_connection = TcpTpktConnection::connect(test_address).await?;
254 let (server_connection, remote_host) = server.accept().await?;
255 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
256
257 drop(server);
258
259 let (mut client_reader, mut client_writer) = client_connection.split().await?;
260 let (mut server_reader, mut server_writer) = server_connection.split().await?;
261
262 server_writer.send(&mut VecDeque::from_iter(vec![b"".to_vec()])).await?;
263 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
264
265 for _ in 0..1000 {
266 match server_reader.recv().await? {
267 TpktRecvResult::Data(x) => {
268 assert_eq!(x, Vec::from(b"World"));
269 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
270 }
271 _ => assert!(false, "Connection was unexpectedly closed"),
272 }
273 match client_reader.recv().await? {
274 TpktRecvResult::Data(x) => {
275 assert_eq!(x, Vec::from(b""));
276 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
277 }
278 _ => assert!(false, "Connection was unexpectedly closed"),
279 }
280 match server_reader.recv().await? {
281 TpktRecvResult::Data(x) => {
282 assert_eq!(x, Vec::from(b""));
283 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
284 }
285 _ => assert!(false, "Connection was unexpectedly closed"),
286 }
287 match client_reader.recv().await? {
288 TpktRecvResult::Data(x) => {
289 assert_eq!(x, Vec::from(b"World"));
290 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
291 }
292 _ => assert!(false, "Connection was unexpectedly closed"),
293 }
294 }
295
296 match server_reader.recv().await? {
298 TpktRecvResult::Data(x) => {
299 assert_eq!(x, Vec::from(b"World"));
300 }
301 _ => assert!(false, "Connection was unexpectedly closed"),
302 }
303 match client_reader.recv().await? {
304 TpktRecvResult::Data(x) => {
305 assert_eq!(x, Vec::from(b""));
306 }
307 _ => assert!(false, "Connection was unexpectedly closed"),
308 }
309
310 drop(server_writer);
311 drop(server_reader);
312
313 match client_reader.recv().await? {
314 TpktRecvResult::Closed => (),
315 _ => assert!(false, "Failed to close connection gracefully."),
316 };
317
318 Ok(())
319 }
320
321 #[tokio::test]
322 #[traced_test]
323 async fn test_txrx_max_byte_data() -> Result<(), anyhow::Error> {
324 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
325 let server = TcpTpktServer::listen(test_address).await?;
326
327 let client_connection = TcpTpktConnection::connect(test_address).await?;
328 let (server_connection, remote_host) = server.accept().await?;
329 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
330
331 drop(server);
332
333 let mut buffer = [0u8; 65531];
334 rand::rng().fill_bytes(&mut buffer[..]);
335
336 let (mut client_reader, mut client_writer) = client_connection.split().await?;
337 let (mut server_reader, mut server_writer) = server_connection.split().await?;
338
339 server_writer.send(&mut VecDeque::from_iter(vec![buffer.to_vec()])).await?;
340 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
341
342 for _ in 0..1000 {
343 match server_reader.recv().await? {
344 TpktRecvResult::Data(x) => {
345 assert_eq!(x, Vec::from(b"World"));
346 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
347 }
348 _ => assert!(false, "Connection was unexpectedly closed"),
349 }
350 match client_reader.recv().await? {
351 TpktRecvResult::Data(x) => {
352 assert_eq!(x, Vec::from(buffer));
353 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
354 }
355 _ => assert!(false, "Connection was unexpectedly closed"),
356 }
357 match server_reader.recv().await? {
358 TpktRecvResult::Data(x) => {
359 assert_eq!(x, Vec::from(buffer));
360 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
361 }
362 _ => assert!(false, "Connection was unexpectedly closed"),
363 }
364 match client_reader.recv().await? {
365 TpktRecvResult::Data(x) => {
366 assert_eq!(x, Vec::from(b"World"));
367 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
368 }
369 _ => assert!(false, "Connection was unexpectedly closed"),
370 }
371 }
372
373 match server_reader.recv().await? {
375 TpktRecvResult::Data(x) => {
376 assert_eq!(x, Vec::from(b"World"));
377 }
378 _ => assert!(false, "Connection was unexpectedly closed"),
379 }
380 match client_reader.recv().await? {
381 TpktRecvResult::Data(x) => {
382 assert_eq!(x, Vec::from(buffer));
383 }
384 _ => assert!(false, "Connection was unexpectedly closed"),
385 }
386
387 drop(server_writer);
388 drop(server_reader);
389
390 match client_reader.recv().await? {
391 TpktRecvResult::Closed => (),
392 _ => assert!(false, "Failed to close connection gracefully."),
393 };
394
395 Ok(())
396 }
397
398 #[tokio::test]
399 #[traced_test]
400 async fn test_txrx_over_max_byte_data() -> Result<(), anyhow::Error> {
401 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
402 let server = TcpTpktServer::listen(test_address).await?;
403
404 let client_connection = TcpTpktConnection::connect(test_address).await?;
405 let (server_connection, remote_host) = server.accept().await?;
406 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
407
408 drop(server);
409
410 let mut over_buffer = [0u8; 65532];
411 rand::rng().fill_bytes(&mut over_buffer[..]);
412
413 let (mut client_reader, mut client_writer) = client_connection.split().await?;
414 let (mut server_reader, mut server_writer) = server_connection.split().await?;
415
416 match server_writer.send(&mut VecDeque::from_iter(vec![over_buffer.to_vec()])).await {
417 Ok(_) => assert!(false, "This was expected to fail as it is over the max payload limit"),
418 Err(TpktError::ProtocolError(x)) => assert_eq!(x, "TPKT user data must be less than or equal to 65531 but was 65532"),
419 _ => assert!(false, "Something unexpected happened"),
420 };
421
422 let mut buffer = [0u8; 65531];
424 rand::rng().fill_bytes(&mut buffer[..]);
425 server_writer.send(&mut VecDeque::from_iter(vec![buffer.to_vec()])).await?;
426 client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
427
428 for _ in 0..100 {
429 match server_reader.recv().await? {
430 TpktRecvResult::Data(x) => {
431 assert_eq!(x, Vec::from(b"World"));
432 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
433 }
434 _ => assert!(false, "Connection was unexpectedly closed"),
435 }
436 match client_reader.recv().await? {
437 TpktRecvResult::Data(x) => {
438 assert_eq!(x, Vec::from(buffer));
439 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
440 }
441 _ => assert!(false, "Connection was unexpectedly closed"),
442 }
443 match server_reader.recv().await? {
444 TpktRecvResult::Data(x) => {
445 assert_eq!(x, Vec::from(buffer));
446 server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
447 }
448 _ => assert!(false, "Connection was unexpectedly closed"),
449 }
450 match client_reader.recv().await? {
451 TpktRecvResult::Data(x) => {
452 assert_eq!(x, Vec::from(b"World"));
453 client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
454 }
455 _ => assert!(false, "Connection was unexpectedly closed"),
456 }
457 }
458
459 match server_reader.recv().await? {
461 TpktRecvResult::Data(x) => {
462 assert_eq!(x, Vec::from(b"World"));
463 }
464 _ => assert!(false, "Connection was unexpectedly closed"),
465 }
466 match client_reader.recv().await? {
467 TpktRecvResult::Data(x) => {
468 assert_eq!(x, Vec::from(buffer));
469 }
470 _ => assert!(false, "Connection was unexpectedly closed"),
471 }
472
473 drop(server_writer);
474 drop(server_reader);
475
476 match client_reader.recv().await? {
477 TpktRecvResult::Closed => (),
478 _ => assert!(false, "Failed to close connection gracefully."),
479 };
480
481 Ok(())
482 }
483
484 #[tokio::test]
485 #[traced_test]
486 async fn test_no_open_socket() -> Result<(), anyhow::Error> {
487 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
488
489 match TcpTpktConnection::connect(test_address).await {
490 Ok(_) => assert!(false, "This was expected to fail as a socket was not opened."),
491 Err(TpktError::IoError(x)) => assert_eq!(x.kind(), ErrorKind::ConnectionRefused),
492 Err(x) => assert!(false, "Something unexpected happened: {:?}", x),
493 };
494
495 Ok(())
496 }
497}