Skip to main content

rusty_tpkt/
lib.rs

1#![doc = include_str!("../README.md")]
2
3mod api;
4mod parser;
5mod serialiser;
6mod service;
7
8pub use crate::api::*;
9pub use crate::service::*;
10
11#[cfg(test)]
12mod tests {
13
14    use std::{collections::VecDeque, io::ErrorKind, ops::Range};
15
16    use rand::RngCore;
17    use tracing_test::traced_test;
18
19    use super::*;
20
21    #[tokio::test(flavor = "multi_thread")]
22    #[traced_test]
23    async fn test_txrx_sequential_payloads() -> Result<(), anyhow::Error> {
24        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
25        let server = TcpTpktServer::listen(test_address).await?;
26
27        // This proves we can drop the connection after the split takes place.
28        let (mut client_reader, mut client_writer, mut server_reader, mut server_writer) = {
29            let client_connection = TcpTpktConnection::connect(test_address).await?;
30            let (server_connection, remote_host) = server.accept().await?;
31            assert!(remote_host.to_string().starts_with("127.0.0.1:"));
32
33            let (client_reader, client_writer) = client_connection.split().await?;
34            let (server_reader, server_writer) = server_connection.split().await?;
35            (client_reader, client_writer, server_reader, server_writer)
36        };
37
38        server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
39        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
40
41        drop(server);
42
43        for _ in 0..1000 {
44            match server_reader.recv().await? {
45                TpktRecvResult::Data(x) => {
46                    assert_eq!(x, Vec::from(b"World"));
47                    server_writer.send(&mut VecDeque::from(vec![x.to_vec()])).await?;
48                }
49                _ => assert!(false, "Connection was unexpectedly closed"),
50            }
51            match client_reader.recv().await? {
52                TpktRecvResult::Data(x) => {
53                    assert_eq!(x, Vec::from(b"Hello"));
54                    client_writer.send(&mut VecDeque::from(vec![x.to_vec()])).await?;
55                }
56                _ => assert!(false, "Connection was unexpectedly closed"),
57            }
58            match server_reader.recv().await? {
59                TpktRecvResult::Data(x) => {
60                    assert_eq!(x, Vec::from(b"Hello"));
61                    server_writer.send(&mut VecDeque::from(vec![x.to_vec()])).await?;
62                }
63                _ => assert!(false, "Connection was unexpectedly closed"),
64            }
65            match client_reader.recv().await? {
66                TpktRecvResult::Data(x) => {
67                    assert_eq!(x, Vec::from(b"World"));
68                    client_writer.send(&mut VecDeque::from(vec![x.to_vec()])).await?;
69                }
70                _ => assert!(false, "Connection was unexpectedly closed"),
71            }
72        }
73
74        // Drain connections so they can be gracefully shutdown.
75        match server_reader.recv().await? {
76            TpktRecvResult::Data(x) => {
77                assert_eq!(x, Vec::from(b"World"));
78            }
79            _ => assert!(false, "Connection was unexpectedly closed"),
80        }
81        match client_reader.recv().await? {
82            TpktRecvResult::Data(x) => {
83                assert_eq!(x, Vec::from(b"Hello"));
84            }
85            _ => assert!(false, "Connection was unexpectedly closed"),
86        }
87
88        drop(server_writer);
89        drop(server_reader);
90
91        match client_reader.recv().await? {
92            TpktRecvResult::Closed => (),
93            _ => assert!(false, "Failed to close connection gracefully."),
94        };
95
96        Ok(())
97    }
98
99    #[tokio::test]
100    #[traced_test]
101    async fn test_txrx_concurrent_payloads() -> Result<(), anyhow::Error> {
102        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
103        let server = TcpTpktServer::listen(test_address).await?;
104
105        let client_connection = TcpTpktConnection::connect(test_address).await?;
106        let (server_connection, remote_host) = server.accept().await?;
107        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
108
109        let (mut client_reader, mut client_writer) = client_connection.split().await?;
110        let (mut server_reader, mut server_writer) = server_connection.split().await?;
111
112        server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
113        server_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
114
115        drop(server);
116
117        for _ in 0..1000 {
118            match client_reader.recv().await? {
119                TpktRecvResult::Data(x) => {
120                    assert_eq!(x, Vec::from(b"Hello"));
121                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
122                }
123                _ => panic!("Connection was unexpectedly closed"),
124            }
125            match client_reader.recv().await? {
126                TpktRecvResult::Data(x) => {
127                    assert_eq!(x, Vec::from(b"World"));
128                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
129                }
130                _ => panic!("Connection was unexpectedly closed"),
131            }
132            match server_reader.recv().await? {
133                TpktRecvResult::Data(x) => {
134                    assert_eq!(x, Vec::from(b"Hello"));
135                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
136                }
137                _ => panic!("Connection was unexpectedly closed"),
138            }
139            match server_reader.recv().await? {
140                TpktRecvResult::Data(x) => {
141                    assert_eq!(x, Vec::from(b"World"));
142                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
143                }
144                _ => panic!("Connection was unexpectedly closed"),
145            }
146        }
147
148        match client_reader.recv().await? {
149            TpktRecvResult::Data(x) => {
150                assert_eq!(x, Vec::from(b"Hello"));
151            }
152            _ => panic!("Connection was unexpectedly closed"),
153        }
154        match client_reader.recv().await? {
155            TpktRecvResult::Data(x) => {
156                assert_eq!(x, Vec::from(b"World"));
157            }
158            _ => panic!("Connection was unexpectedly closed"),
159        }
160
161        drop(client_writer);
162        drop(client_reader);
163
164        // Drain connections so they can be gracefully shutdown.
165        match server_reader.recv().await? {
166            TpktRecvResult::Closed => (),
167            _ => assert!(false, "Failed to close connection gracefully."),
168        };
169
170        Ok(())
171    }
172
173    #[tokio::test]
174    #[traced_test]
175    async fn test_txrx_sequential_ungraceful_shutdown() -> Result<(), anyhow::Error> {
176        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
177        let server = TcpTpktServer::listen(test_address).await?;
178
179        let client_connection = TcpTpktConnection::connect(test_address).await?;
180        let (server_connection, remote_host) = server.accept().await?;
181        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
182
183        let (mut client_reader, mut client_writer) = client_connection.split().await?;
184        let (mut server_reader, mut server_writer) = server_connection.split().await?;
185
186        server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
187        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
188
189        drop(server);
190
191        for _ in 0..1000 {
192            match server_reader.recv().await? {
193                TpktRecvResult::Data(x) => {
194                    assert_eq!(x, Vec::from(b"World"));
195                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
196                }
197                _ => assert!(false, "Connection was unexpectedly closed"),
198            }
199            match client_reader.recv().await? {
200                TpktRecvResult::Data(x) => {
201                    assert_eq!(x, Vec::from(b"Hello"));
202                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
203                }
204                _ => assert!(false, "Connection was unexpectedly closed"),
205            }
206            match server_reader.recv().await? {
207                TpktRecvResult::Data(x) => {
208                    assert_eq!(x, Vec::from(b"Hello"));
209                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
210                }
211                _ => assert!(false, "Connection was unexpectedly closed"),
212            }
213            match client_reader.recv().await? {
214                TpktRecvResult::Data(x) => {
215                    assert_eq!(x, Vec::from(b"World"));
216                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
217                }
218                _ => assert!(false, "Connection was unexpectedly closed"),
219            }
220        }
221
222        // Drain connections so they can be gracefully shutdown.
223        match server_reader.recv().await? {
224            TpktRecvResult::Data(x) => {
225                assert_eq!(x, Vec::from(b"World"));
226            }
227            _ => assert!(false, "Connection was unexpectedly closed"),
228        }
229        match client_reader.recv().await? {
230            TpktRecvResult::Data(x) => {
231                assert_eq!(x, Vec::from(b"Hello"));
232            }
233            _ => assert!(false, "Connection was unexpectedly closed"),
234        }
235
236        drop(server_writer);
237        drop(server_reader);
238
239        match client_reader.recv().await? {
240            TpktRecvResult::Closed => (),
241            _ => assert!(false, "Failed to close connection gracefully."),
242        };
243
244        Ok(())
245    }
246
247    #[tokio::test]
248    #[traced_test]
249    async fn test_txrx_zero_byte_data() -> Result<(), anyhow::Error> {
250        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
251        let server = TcpTpktServer::listen(test_address).await?;
252
253        let client_connection = TcpTpktConnection::connect(test_address).await?;
254        let (server_connection, remote_host) = server.accept().await?;
255        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
256
257        drop(server);
258
259        let (mut client_reader, mut client_writer) = client_connection.split().await?;
260        let (mut server_reader, mut server_writer) = server_connection.split().await?;
261
262        server_writer.send(&mut VecDeque::from_iter(vec![b"".to_vec()])).await?;
263        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
264
265        for _ in 0..1000 {
266            match server_reader.recv().await? {
267                TpktRecvResult::Data(x) => {
268                    assert_eq!(x, Vec::from(b"World"));
269                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
270                }
271                _ => assert!(false, "Connection was unexpectedly closed"),
272            }
273            match client_reader.recv().await? {
274                TpktRecvResult::Data(x) => {
275                    assert_eq!(x, Vec::from(b""));
276                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
277                }
278                _ => assert!(false, "Connection was unexpectedly closed"),
279            }
280            match server_reader.recv().await? {
281                TpktRecvResult::Data(x) => {
282                    assert_eq!(x, Vec::from(b""));
283                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
284                }
285                _ => assert!(false, "Connection was unexpectedly closed"),
286            }
287            match client_reader.recv().await? {
288                TpktRecvResult::Data(x) => {
289                    assert_eq!(x, Vec::from(b"World"));
290                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
291                }
292                _ => assert!(false, "Connection was unexpectedly closed"),
293            }
294        }
295
296        // Drain connections so they can be gracefully shutdown.
297        match server_reader.recv().await? {
298            TpktRecvResult::Data(x) => {
299                assert_eq!(x, Vec::from(b"World"));
300            }
301            _ => assert!(false, "Connection was unexpectedly closed"),
302        }
303        match client_reader.recv().await? {
304            TpktRecvResult::Data(x) => {
305                assert_eq!(x, Vec::from(b""));
306            }
307            _ => assert!(false, "Connection was unexpectedly closed"),
308        }
309
310        drop(server_writer);
311        drop(server_reader);
312
313        match client_reader.recv().await? {
314            TpktRecvResult::Closed => (),
315            _ => assert!(false, "Failed to close connection gracefully."),
316        };
317
318        Ok(())
319    }
320
321    #[tokio::test]
322    #[traced_test]
323    async fn test_txrx_max_byte_data() -> Result<(), anyhow::Error> {
324        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
325        let server = TcpTpktServer::listen(test_address).await?;
326
327        let client_connection = TcpTpktConnection::connect(test_address).await?;
328        let (server_connection, remote_host) = server.accept().await?;
329        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
330
331        drop(server);
332
333        let mut buffer = [0u8; 65531];
334        rand::rng().fill_bytes(&mut buffer[..]);
335
336        let (mut client_reader, mut client_writer) = client_connection.split().await?;
337        let (mut server_reader, mut server_writer) = server_connection.split().await?;
338
339        server_writer.send(&mut VecDeque::from_iter(vec![buffer.to_vec()])).await?;
340        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
341
342        for _ in 0..1000 {
343            match server_reader.recv().await? {
344                TpktRecvResult::Data(x) => {
345                    assert_eq!(x, Vec::from(b"World"));
346                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
347                }
348                _ => assert!(false, "Connection was unexpectedly closed"),
349            }
350            match client_reader.recv().await? {
351                TpktRecvResult::Data(x) => {
352                    assert_eq!(x, Vec::from(buffer));
353                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
354                }
355                _ => assert!(false, "Connection was unexpectedly closed"),
356            }
357            match server_reader.recv().await? {
358                TpktRecvResult::Data(x) => {
359                    assert_eq!(x, Vec::from(buffer));
360                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
361                }
362                _ => assert!(false, "Connection was unexpectedly closed"),
363            }
364            match client_reader.recv().await? {
365                TpktRecvResult::Data(x) => {
366                    assert_eq!(x, Vec::from(b"World"));
367                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
368                }
369                _ => assert!(false, "Connection was unexpectedly closed"),
370            }
371        }
372
373        // Drain connections so they can be gracefully shutdown.
374        match server_reader.recv().await? {
375            TpktRecvResult::Data(x) => {
376                assert_eq!(x, Vec::from(b"World"));
377            }
378            _ => assert!(false, "Connection was unexpectedly closed"),
379        }
380        match client_reader.recv().await? {
381            TpktRecvResult::Data(x) => {
382                assert_eq!(x, Vec::from(buffer));
383            }
384            _ => assert!(false, "Connection was unexpectedly closed"),
385        }
386
387        drop(server_writer);
388        drop(server_reader);
389
390        match client_reader.recv().await? {
391            TpktRecvResult::Closed => (),
392            _ => assert!(false, "Failed to close connection gracefully."),
393        };
394
395        Ok(())
396    }
397
398    #[tokio::test]
399    #[traced_test]
400    async fn test_txrx_over_max_byte_data() -> Result<(), anyhow::Error> {
401        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
402        let server = TcpTpktServer::listen(test_address).await?;
403
404        let client_connection = TcpTpktConnection::connect(test_address).await?;
405        let (server_connection, remote_host) = server.accept().await?;
406        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
407
408        drop(server);
409
410        let mut over_buffer = [0u8; 65532];
411        rand::rng().fill_bytes(&mut over_buffer[..]);
412
413        let (mut client_reader, mut client_writer) = client_connection.split().await?;
414        let (mut server_reader, mut server_writer) = server_connection.split().await?;
415
416        match server_writer.send(&mut VecDeque::from_iter(vec![over_buffer.to_vec()])).await {
417            Ok(_) => assert!(false, "This was expected to fail as it is over the max payload limit"),
418            Err(TpktError::ProtocolError(x)) => assert_eq!(x, "TPKT user data must be less than or equal to 65531 but was 65532"),
419            _ => assert!(false, "Something unexpected happened"),
420        };
421
422        // Try again and lets keep going
423        let mut buffer = [0u8; 65531];
424        rand::rng().fill_bytes(&mut buffer[..]);
425        server_writer.send(&mut VecDeque::from_iter(vec![buffer.to_vec()])).await?;
426        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
427
428        for _ in 0..100 {
429            match server_reader.recv().await? {
430                TpktRecvResult::Data(x) => {
431                    assert_eq!(x, Vec::from(b"World"));
432                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
433                }
434                _ => assert!(false, "Connection was unexpectedly closed"),
435            }
436            match client_reader.recv().await? {
437                TpktRecvResult::Data(x) => {
438                    assert_eq!(x, Vec::from(buffer));
439                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
440                }
441                _ => assert!(false, "Connection was unexpectedly closed"),
442            }
443            match server_reader.recv().await? {
444                TpktRecvResult::Data(x) => {
445                    assert_eq!(x, Vec::from(buffer));
446                    server_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
447                }
448                _ => assert!(false, "Connection was unexpectedly closed"),
449            }
450            match client_reader.recv().await? {
451                TpktRecvResult::Data(x) => {
452                    assert_eq!(x, Vec::from(b"World"));
453                    client_writer.send(&mut VecDeque::from_iter(vec![x.to_vec()])).await?;
454                }
455                _ => assert!(false, "Connection was unexpectedly closed"),
456            }
457        }
458
459        // Drain connections so they can be gracefully shutdown.
460        match server_reader.recv().await? {
461            TpktRecvResult::Data(x) => {
462                assert_eq!(x, Vec::from(b"World"));
463            }
464            _ => assert!(false, "Connection was unexpectedly closed"),
465        }
466        match client_reader.recv().await? {
467            TpktRecvResult::Data(x) => {
468                assert_eq!(x, Vec::from(buffer));
469            }
470            _ => assert!(false, "Connection was unexpectedly closed"),
471        }
472
473        drop(server_writer);
474        drop(server_reader);
475
476        match client_reader.recv().await? {
477            TpktRecvResult::Closed => (),
478            _ => assert!(false, "Failed to close connection gracefully."),
479        };
480
481        Ok(())
482    }
483
484    #[tokio::test]
485    #[traced_test]
486    async fn test_no_open_socket() -> Result<(), anyhow::Error> {
487        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
488
489        match TcpTpktConnection::connect(test_address).await {
490            Ok(_) => assert!(false, "This was expected to fail as a socket was not opened."),
491            Err(TpktError::IoError(x)) => assert_eq!(x.kind(), ErrorKind::ConnectionRefused),
492            Err(x) => assert!(false, "Something unexpected happened: {:?}", x),
493        };
494
495        Ok(())
496    }
497}