1mod api;
2mod parser;
3mod serialiser;
4mod service;
5
6pub use crate::api::*;
7pub use crate::service::*;
8
9#[cfg(test)]
10mod tests {
11
12 use std::{io::ErrorKind, ops::Range};
13
14 use rand::RngCore;
15 use tracing_test::traced_test;
16
17 use super::*;
18
19 #[tokio::test(flavor = "multi_thread")]
20 #[traced_test]
21 async fn test_txrx_sequential_payloads() -> Result<(), anyhow::Error> {
22 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
23 let server = TcpTpktServer::listen(test_address).await?;
24
25 let (mut client_reader, mut client_writer, mut server_reader, mut server_writer) = {
27 let client_connection = TcpTpktConnection::connect(test_address).await?;
28 let (server_connection, remote_host) = server.accept().await?;
29 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
30
31 let (client_reader, client_writer) = client_connection.split().await?;
32 let (server_reader, server_writer) = server_connection.split().await?;
33 (client_reader, client_writer, server_reader, server_writer)
34 };
35
36 server_writer.send(b"Hello").await?;
37 client_writer.send(b"World").await?;
38
39 drop(server);
40
41 for _ in 0..1000 {
42 match server_reader.recv().await? {
43 TpktRecvResult::Data(x) => {
44 assert_eq!(x, Vec::from(b"World"));
45 server_writer.send(x.as_slice()).await?;
46 }
47 _ => assert!(false, "Connection was unexpectedly closed"),
48 }
49 match client_reader.recv().await? {
50 TpktRecvResult::Data(x) => {
51 assert_eq!(x, Vec::from(b"Hello"));
52 client_writer.send(x.as_slice()).await?;
53 }
54 _ => assert!(false, "Connection was unexpectedly closed"),
55 }
56 match server_reader.recv().await? {
57 TpktRecvResult::Data(x) => {
58 assert_eq!(x, Vec::from(b"Hello"));
59 server_writer.send(x.as_slice()).await?;
60 }
61 _ => assert!(false, "Connection was unexpectedly closed"),
62 }
63 match client_reader.recv().await? {
64 TpktRecvResult::Data(x) => {
65 assert_eq!(x, Vec::from(b"World"));
66 client_writer.send(x.as_slice()).await?;
67 }
68 _ => assert!(false, "Connection was unexpectedly closed"),
69 }
70 }
71
72 match server_reader.recv().await? {
74 TpktRecvResult::Data(x) => {
75 assert_eq!(x, Vec::from(b"World"));
76 }
77 _ => assert!(false, "Connection was unexpectedly closed"),
78 }
79 match client_reader.recv().await? {
80 TpktRecvResult::Data(x) => {
81 assert_eq!(x, Vec::from(b"Hello"));
82 }
83 _ => assert!(false, "Connection was unexpectedly closed"),
84 }
85
86 drop(server_writer);
87 drop(server_reader);
88
89 match client_reader.recv().await? {
90 TpktRecvResult::Closed => (),
91 _ => assert!(false, "Failed to close connection gracefully."),
92 };
93
94 Ok(())
95 }
96
97 #[tokio::test]
98 #[traced_test]
99 async fn test_txrx_concurrent_payloads() -> Result<(), anyhow::Error> {
100 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
101 let server = TcpTpktServer::listen(test_address).await?;
102
103 let client_connection = TcpTpktConnection::connect(test_address).await?;
104 let (server_connection, remote_host) = server.accept().await?;
105 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
106
107 let (mut client_reader, mut client_writer) = client_connection.split().await?;
108 let (mut server_reader, mut server_writer) = server_connection.split().await?;
109
110 server_writer.send(b"Hello").await?;
111 server_writer.send(b"World").await?;
112
113 drop(server);
114
115 for _ in 0..1000 {
116 match client_reader.recv().await? {
117 TpktRecvResult::Data(x) => {
118 assert_eq!(x, Vec::from(b"Hello"));
119 client_writer.send(x.as_slice()).await?;
120 }
121 _ => panic!("Connection was unexpectedly closed"),
122 }
123 match client_reader.recv().await? {
124 TpktRecvResult::Data(x) => {
125 assert_eq!(x, Vec::from(b"World"));
126 client_writer.send(x.as_slice()).await?;
127 }
128 _ => panic!("Connection was unexpectedly closed"),
129 }
130 match server_reader.recv().await? {
131 TpktRecvResult::Data(x) => {
132 assert_eq!(x, Vec::from(b"Hello"));
133 server_writer.send(x.as_slice()).await?;
134 }
135 _ => panic!("Connection was unexpectedly closed"),
136 }
137 match server_reader.recv().await? {
138 TpktRecvResult::Data(x) => {
139 assert_eq!(x, Vec::from(b"World"));
140 server_writer.send(x.as_slice()).await?;
141 }
142 _ => panic!("Connection was unexpectedly closed"),
143 }
144 }
145
146 match client_reader.recv().await? {
147 TpktRecvResult::Data(x) => {
148 assert_eq!(x, Vec::from(b"Hello"));
149 }
150 _ => panic!("Connection was unexpectedly closed"),
151 }
152 match client_reader.recv().await? {
153 TpktRecvResult::Data(x) => {
154 assert_eq!(x, Vec::from(b"World"));
155 }
156 _ => panic!("Connection was unexpectedly closed"),
157 }
158
159 drop(client_writer);
160 drop(client_reader);
161
162 match server_reader.recv().await? {
164 TpktRecvResult::Closed => (),
165 _ => assert!(false, "Failed to close connection gracefully."),
166 };
167
168 Ok(())
169 }
170
171 #[tokio::test]
172 #[traced_test]
173 async fn test_txrx_sequential_ungraceful_shutdown() -> Result<(), anyhow::Error> {
174 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
175 let server = TcpTpktServer::listen(test_address).await?;
176
177 let client_connection = TcpTpktConnection::connect(test_address).await?;
178 let (server_connection, remote_host) = server.accept().await?;
179 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
180
181 let (mut client_reader, mut client_writer) = client_connection.split().await?;
182 let (mut server_reader, mut server_writer) = server_connection.split().await?;
183
184 server_writer.send(b"Hello").await?;
185 client_writer.send(b"World").await?;
186
187 drop(server);
188
189 for _ in 0..1000 {
190 match server_reader.recv().await? {
191 TpktRecvResult::Data(x) => {
192 assert_eq!(x, Vec::from(b"World"));
193 server_writer.send(x.as_slice()).await?;
194 }
195 _ => assert!(false, "Connection was unexpectedly closed"),
196 }
197 match client_reader.recv().await? {
198 TpktRecvResult::Data(x) => {
199 assert_eq!(x, Vec::from(b"Hello"));
200 client_writer.send(x.as_slice()).await?;
201 }
202 _ => assert!(false, "Connection was unexpectedly closed"),
203 }
204 match server_reader.recv().await? {
205 TpktRecvResult::Data(x) => {
206 assert_eq!(x, Vec::from(b"Hello"));
207 server_writer.send(x.as_slice()).await?;
208 }
209 _ => assert!(false, "Connection was unexpectedly closed"),
210 }
211 match client_reader.recv().await? {
212 TpktRecvResult::Data(x) => {
213 assert_eq!(x, Vec::from(b"World"));
214 client_writer.send(x.as_slice()).await?;
215 }
216 _ => assert!(false, "Connection was unexpectedly closed"),
217 }
218 }
219
220 match server_reader.recv().await? {
222 TpktRecvResult::Data(x) => {
223 assert_eq!(x, Vec::from(b"World"));
224 }
225 _ => assert!(false, "Connection was unexpectedly closed"),
226 }
227 match client_reader.recv().await? {
228 TpktRecvResult::Data(x) => {
229 assert_eq!(x, Vec::from(b"Hello"));
230 }
231 _ => assert!(false, "Connection was unexpectedly closed"),
232 }
233
234 drop(server_writer);
235 drop(server_reader);
236
237 match client_reader.recv().await? {
238 TpktRecvResult::Closed => (),
239 _ => assert!(false, "Failed to close connection gracefully."),
240 };
241
242 Ok(())
243 }
244
245 #[tokio::test]
246 #[traced_test]
247 async fn test_txrx_zero_byte_data() -> Result<(), anyhow::Error> {
248 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
249 let server = TcpTpktServer::listen(test_address).await?;
250
251 let client_connection = TcpTpktConnection::connect(test_address).await?;
252 let (server_connection, remote_host) = server.accept().await?;
253 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
254
255 drop(server);
256
257 let (mut client_reader, mut client_writer) = client_connection.split().await?;
258 let (mut server_reader, mut server_writer) = server_connection.split().await?;
259
260 server_writer.send(b"").await?;
261 client_writer.send(b"World").await?;
262
263 for _ in 0..1000 {
264 match server_reader.recv().await? {
265 TpktRecvResult::Data(x) => {
266 assert_eq!(x, Vec::from(b"World"));
267 server_writer.send(x.as_slice()).await?;
268 }
269 _ => assert!(false, "Connection was unexpectedly closed"),
270 }
271 match client_reader.recv().await? {
272 TpktRecvResult::Data(x) => {
273 assert_eq!(x, Vec::from(b""));
274 client_writer.send(x.as_slice()).await?;
275 }
276 _ => assert!(false, "Connection was unexpectedly closed"),
277 }
278 match server_reader.recv().await? {
279 TpktRecvResult::Data(x) => {
280 assert_eq!(x, Vec::from(b""));
281 server_writer.send(x.as_slice()).await?;
282 }
283 _ => assert!(false, "Connection was unexpectedly closed"),
284 }
285 match client_reader.recv().await? {
286 TpktRecvResult::Data(x) => {
287 assert_eq!(x, Vec::from(b"World"));
288 client_writer.send(x.as_slice()).await?;
289 }
290 _ => assert!(false, "Connection was unexpectedly closed"),
291 }
292 }
293
294 match server_reader.recv().await? {
296 TpktRecvResult::Data(x) => {
297 assert_eq!(x, Vec::from(b"World"));
298 }
299 _ => assert!(false, "Connection was unexpectedly closed"),
300 }
301 match client_reader.recv().await? {
302 TpktRecvResult::Data(x) => {
303 assert_eq!(x, Vec::from(b""));
304 }
305 _ => assert!(false, "Connection was unexpectedly closed"),
306 }
307
308 drop(server_writer);
309 drop(server_reader);
310
311 match client_reader.recv().await? {
312 TpktRecvResult::Closed => (),
313 _ => assert!(false, "Failed to close connection gracefully."),
314 };
315
316 Ok(())
317 }
318
319 #[tokio::test]
320 #[traced_test]
321 async fn test_txrx_max_byte_data() -> Result<(), anyhow::Error> {
322 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
323 let server = TcpTpktServer::listen(test_address).await?;
324
325 let client_connection = TcpTpktConnection::connect(test_address).await?;
326 let (server_connection, remote_host) = server.accept().await?;
327 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
328
329 drop(server);
330
331 let mut buffer = [0u8; 65531];
332 rand::rng().fill_bytes(&mut buffer[..]);
333
334 let (mut client_reader, mut client_writer) = client_connection.split().await?;
335 let (mut server_reader, mut server_writer) = server_connection.split().await?;
336
337 server_writer.send(&buffer).await?;
338 client_writer.send(b"World").await?;
339
340 for _ in 0..1000 {
341 match server_reader.recv().await? {
342 TpktRecvResult::Data(x) => {
343 assert_eq!(x, Vec::from(b"World"));
344 server_writer.send(x.as_slice()).await?;
345 }
346 _ => assert!(false, "Connection was unexpectedly closed"),
347 }
348 match client_reader.recv().await? {
349 TpktRecvResult::Data(x) => {
350 assert_eq!(x, Vec::from(buffer));
351 client_writer.send(x.as_slice()).await?;
352 }
353 _ => assert!(false, "Connection was unexpectedly closed"),
354 }
355 match server_reader.recv().await? {
356 TpktRecvResult::Data(x) => {
357 assert_eq!(x, Vec::from(buffer));
358 server_writer.send(x.as_slice()).await?;
359 }
360 _ => assert!(false, "Connection was unexpectedly closed"),
361 }
362 match client_reader.recv().await? {
363 TpktRecvResult::Data(x) => {
364 assert_eq!(x, Vec::from(b"World"));
365 client_writer.send(x.as_slice()).await?;
366 }
367 _ => assert!(false, "Connection was unexpectedly closed"),
368 }
369 }
370
371 match server_reader.recv().await? {
373 TpktRecvResult::Data(x) => {
374 assert_eq!(x, Vec::from(b"World"));
375 }
376 _ => assert!(false, "Connection was unexpectedly closed"),
377 }
378 match client_reader.recv().await? {
379 TpktRecvResult::Data(x) => {
380 assert_eq!(x, Vec::from(buffer));
381 }
382 _ => assert!(false, "Connection was unexpectedly closed"),
383 }
384
385 drop(server_writer);
386 drop(server_reader);
387
388 match client_reader.recv().await? {
389 TpktRecvResult::Closed => (),
390 _ => assert!(false, "Failed to close connection gracefully."),
391 };
392
393 Ok(())
394 }
395
396 #[tokio::test]
397 #[traced_test]
398 async fn test_txrx_over_max_byte_data() -> Result<(), anyhow::Error> {
399 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
400 let server = TcpTpktServer::listen(test_address).await?;
401
402 let client_connection = TcpTpktConnection::connect(test_address).await?;
403 let (server_connection, remote_host) = server.accept().await?;
404 assert!(remote_host.to_string().starts_with("127.0.0.1:"));
405
406 drop(server);
407
408 let mut over_buffer = [0u8; 65532];
409 rand::rng().fill_bytes(&mut over_buffer[..]);
410
411 let (mut client_reader, mut client_writer) = client_connection.split().await?;
412 let (mut server_reader, mut server_writer) = server_connection.split().await?;
413
414 match server_writer.send(&over_buffer).await {
415 Ok(_) => assert!(false, "This was expected to fail as it is over the max payload limit"),
416 Err(TpktError::ProtocolError(x)) => assert_eq!(x, "TPKT user data must be less than or equal to 65531 but was 65532"),
417 _ => assert!(false, "Something unexpected happened"),
418 };
419
420 let mut buffer = [0u8; 65531];
422 rand::rng().fill_bytes(&mut buffer[..]);
423 server_writer.send(&buffer).await?;
424 client_writer.send(b"World").await?;
425
426 for _ in 0..100 {
427 match server_reader.recv().await? {
428 TpktRecvResult::Data(x) => {
429 assert_eq!(x, Vec::from(b"World"));
430 server_writer.send(x.as_slice()).await?;
431 }
432 _ => assert!(false, "Connection was unexpectedly closed"),
433 }
434 match client_reader.recv().await? {
435 TpktRecvResult::Data(x) => {
436 assert_eq!(x, Vec::from(buffer));
437 client_writer.send(x.as_slice()).await?;
438 }
439 _ => assert!(false, "Connection was unexpectedly closed"),
440 }
441 match server_reader.recv().await? {
442 TpktRecvResult::Data(x) => {
443 assert_eq!(x, Vec::from(buffer));
444 server_writer.send(x.as_slice()).await?;
445 }
446 _ => assert!(false, "Connection was unexpectedly closed"),
447 }
448 match client_reader.recv().await? {
449 TpktRecvResult::Data(x) => {
450 assert_eq!(x, Vec::from(b"World"));
451 client_writer.send(x.as_slice()).await?;
452 }
453 _ => assert!(false, "Connection was unexpectedly closed"),
454 }
455 }
456
457 match server_reader.recv().await? {
459 TpktRecvResult::Data(x) => {
460 assert_eq!(x, Vec::from(b"World"));
461 }
462 _ => assert!(false, "Connection was unexpectedly closed"),
463 }
464 match client_reader.recv().await? {
465 TpktRecvResult::Data(x) => {
466 assert_eq!(x, Vec::from(buffer));
467 }
468 _ => assert!(false, "Connection was unexpectedly closed"),
469 }
470
471 drop(server_writer);
472 drop(server_reader);
473
474 match client_reader.recv().await? {
475 TpktRecvResult::Closed => (),
476 _ => assert!(false, "Failed to close connection gracefully."),
477 };
478
479 Ok(())
480 }
481
482 #[tokio::test]
483 #[traced_test]
484 async fn test_no_open_socket() -> Result<(), anyhow::Error> {
485 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
486
487 match TcpTpktConnection::connect(test_address).await {
488 Ok(_) => assert!(false, "This was expected to fail as a socket was not opened."),
489 Err(TpktError::IoError(x)) => assert_eq!(x.kind(), ErrorKind::ConnectionRefused),
490 Err(x) => assert!(false, "Something unexpected happened: {:?}", x),
491 };
492
493 Ok(())
494 }
495}