Skip to main content

rusty_tpkt/
lib.rs

1mod api;
2mod parser;
3mod serialiser;
4mod service;
5
6pub use crate::api::*;
7pub use crate::service::*;
8
9#[cfg(test)]
10mod tests {
11
12    use std::{io::ErrorKind, ops::Range};
13
14    use rand::RngCore;
15    use tracing_test::traced_test;
16
17    use super::*;
18
19    #[tokio::test(flavor = "multi_thread")]
20    #[traced_test]
21    async fn test_txrx_sequential_payloads() -> Result<(), anyhow::Error> {
22        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
23        let server = TcpTpktServer::listen(test_address).await?;
24
25        // This proves we can drop the connection after the split takes place.
26        let (mut client_reader, mut client_writer, mut server_reader, mut server_writer) = {
27            let client_connection = TcpTpktConnection::connect(test_address).await?;
28            let (server_connection, remote_host) = server.accept().await?;
29            assert!(remote_host.to_string().starts_with("127.0.0.1:"));
30
31            let (client_reader, client_writer) = client_connection.split().await?;
32            let (server_reader, server_writer) = server_connection.split().await?;
33            (client_reader, client_writer, server_reader, server_writer)
34        };
35
36        server_writer.send(b"Hello").await?;
37        client_writer.send(b"World").await?;
38
39        drop(server);
40
41        for _ in 0..1000 {
42            match server_reader.recv().await? {
43                TpktRecvResult::Data(x) => {
44                    assert_eq!(x, Vec::from(b"World"));
45                    server_writer.send(x.as_slice()).await?;
46                }
47                _ => assert!(false, "Connection was unexpectedly closed"),
48            }
49            match client_reader.recv().await? {
50                TpktRecvResult::Data(x) => {
51                    assert_eq!(x, Vec::from(b"Hello"));
52                    client_writer.send(x.as_slice()).await?;
53                }
54                _ => assert!(false, "Connection was unexpectedly closed"),
55            }
56            match server_reader.recv().await? {
57                TpktRecvResult::Data(x) => {
58                    assert_eq!(x, Vec::from(b"Hello"));
59                    server_writer.send(x.as_slice()).await?;
60                }
61                _ => assert!(false, "Connection was unexpectedly closed"),
62            }
63            match client_reader.recv().await? {
64                TpktRecvResult::Data(x) => {
65                    assert_eq!(x, Vec::from(b"World"));
66                    client_writer.send(x.as_slice()).await?;
67                }
68                _ => assert!(false, "Connection was unexpectedly closed"),
69            }
70        }
71
72        // Drain connections so they can be gracefully shutdown.
73        match server_reader.recv().await? {
74            TpktRecvResult::Data(x) => {
75                assert_eq!(x, Vec::from(b"World"));
76            }
77            _ => assert!(false, "Connection was unexpectedly closed"),
78        }
79        match client_reader.recv().await? {
80            TpktRecvResult::Data(x) => {
81                assert_eq!(x, Vec::from(b"Hello"));
82            }
83            _ => assert!(false, "Connection was unexpectedly closed"),
84        }
85
86        drop(server_writer);
87        drop(server_reader);
88
89        match client_reader.recv().await? {
90            TpktRecvResult::Closed => (),
91            _ => assert!(false, "Failed to close connection gracefully."),
92        };
93
94        Ok(())
95    }
96
97    #[tokio::test]
98    #[traced_test]
99    async fn test_txrx_concurrent_payloads() -> Result<(), anyhow::Error> {
100        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
101        let server = TcpTpktServer::listen(test_address).await?;
102
103        let client_connection = TcpTpktConnection::connect(test_address).await?;
104        let (server_connection, remote_host) = server.accept().await?;
105        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
106
107        let (mut client_reader, mut client_writer) = client_connection.split().await?;
108        let (mut server_reader, mut server_writer) = server_connection.split().await?;
109
110        server_writer.send(b"Hello").await?;
111        server_writer.send(b"World").await?;
112
113        drop(server);
114
115        for _ in 0..1000 {
116            match client_reader.recv().await? {
117                TpktRecvResult::Data(x) => {
118                    assert_eq!(x, Vec::from(b"Hello"));
119                    client_writer.send(x.as_slice()).await?;
120                }
121                _ => panic!("Connection was unexpectedly closed"),
122            }
123            match client_reader.recv().await? {
124                TpktRecvResult::Data(x) => {
125                    assert_eq!(x, Vec::from(b"World"));
126                    client_writer.send(x.as_slice()).await?;
127                }
128                _ => panic!("Connection was unexpectedly closed"),
129            }
130            match server_reader.recv().await? {
131                TpktRecvResult::Data(x) => {
132                    assert_eq!(x, Vec::from(b"Hello"));
133                    server_writer.send(x.as_slice()).await?;
134                }
135                _ => panic!("Connection was unexpectedly closed"),
136            }
137            match server_reader.recv().await? {
138                TpktRecvResult::Data(x) => {
139                    assert_eq!(x, Vec::from(b"World"));
140                    server_writer.send(x.as_slice()).await?;
141                }
142                _ => panic!("Connection was unexpectedly closed"),
143            }
144        }
145
146        match client_reader.recv().await? {
147            TpktRecvResult::Data(x) => {
148                assert_eq!(x, Vec::from(b"Hello"));
149            }
150            _ => panic!("Connection was unexpectedly closed"),
151        }
152        match client_reader.recv().await? {
153            TpktRecvResult::Data(x) => {
154                assert_eq!(x, Vec::from(b"World"));
155            }
156            _ => panic!("Connection was unexpectedly closed"),
157        }
158
159        drop(client_writer);
160        drop(client_reader);
161
162        // Drain connections so they can be gracefully shutdown.
163        match server_reader.recv().await? {
164            TpktRecvResult::Closed => (),
165            _ => assert!(false, "Failed to close connection gracefully."),
166        };
167
168        Ok(())
169    }
170
171    #[tokio::test]
172    #[traced_test]
173    async fn test_txrx_sequential_ungraceful_shutdown() -> Result<(), anyhow::Error> {
174        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
175        let server = TcpTpktServer::listen(test_address).await?;
176
177        let client_connection = TcpTpktConnection::connect(test_address).await?;
178        let (server_connection, remote_host) = server.accept().await?;
179        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
180
181        let (mut client_reader, mut client_writer) = client_connection.split().await?;
182        let (mut server_reader, mut server_writer) = server_connection.split().await?;
183
184        server_writer.send(b"Hello").await?;
185        client_writer.send(b"World").await?;
186
187        drop(server);
188
189        for _ in 0..1000 {
190            match server_reader.recv().await? {
191                TpktRecvResult::Data(x) => {
192                    assert_eq!(x, Vec::from(b"World"));
193                    server_writer.send(x.as_slice()).await?;
194                }
195                _ => assert!(false, "Connection was unexpectedly closed"),
196            }
197            match client_reader.recv().await? {
198                TpktRecvResult::Data(x) => {
199                    assert_eq!(x, Vec::from(b"Hello"));
200                    client_writer.send(x.as_slice()).await?;
201                }
202                _ => assert!(false, "Connection was unexpectedly closed"),
203            }
204            match server_reader.recv().await? {
205                TpktRecvResult::Data(x) => {
206                    assert_eq!(x, Vec::from(b"Hello"));
207                    server_writer.send(x.as_slice()).await?;
208                }
209                _ => assert!(false, "Connection was unexpectedly closed"),
210            }
211            match client_reader.recv().await? {
212                TpktRecvResult::Data(x) => {
213                    assert_eq!(x, Vec::from(b"World"));
214                    client_writer.send(x.as_slice()).await?;
215                }
216                _ => assert!(false, "Connection was unexpectedly closed"),
217            }
218        }
219
220        // Drain connections so they can be gracefully shutdown.
221        match server_reader.recv().await? {
222            TpktRecvResult::Data(x) => {
223                assert_eq!(x, Vec::from(b"World"));
224            }
225            _ => assert!(false, "Connection was unexpectedly closed"),
226        }
227        match client_reader.recv().await? {
228            TpktRecvResult::Data(x) => {
229                assert_eq!(x, Vec::from(b"Hello"));
230            }
231            _ => assert!(false, "Connection was unexpectedly closed"),
232        }
233
234        drop(server_writer);
235        drop(server_reader);
236
237        match client_reader.recv().await? {
238            TpktRecvResult::Closed => (),
239            _ => assert!(false, "Failed to close connection gracefully."),
240        };
241
242        Ok(())
243    }
244
245    #[tokio::test]
246    #[traced_test]
247    async fn test_txrx_zero_byte_data() -> Result<(), anyhow::Error> {
248        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
249        let server = TcpTpktServer::listen(test_address).await?;
250
251        let client_connection = TcpTpktConnection::connect(test_address).await?;
252        let (server_connection, remote_host) = server.accept().await?;
253        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
254
255        drop(server);
256
257        let (mut client_reader, mut client_writer) = client_connection.split().await?;
258        let (mut server_reader, mut server_writer) = server_connection.split().await?;
259
260        server_writer.send(b"").await?;
261        client_writer.send(b"World").await?;
262
263        for _ in 0..1000 {
264            match server_reader.recv().await? {
265                TpktRecvResult::Data(x) => {
266                    assert_eq!(x, Vec::from(b"World"));
267                    server_writer.send(x.as_slice()).await?;
268                }
269                _ => assert!(false, "Connection was unexpectedly closed"),
270            }
271            match client_reader.recv().await? {
272                TpktRecvResult::Data(x) => {
273                    assert_eq!(x, Vec::from(b""));
274                    client_writer.send(x.as_slice()).await?;
275                }
276                _ => assert!(false, "Connection was unexpectedly closed"),
277            }
278            match server_reader.recv().await? {
279                TpktRecvResult::Data(x) => {
280                    assert_eq!(x, Vec::from(b""));
281                    server_writer.send(x.as_slice()).await?;
282                }
283                _ => assert!(false, "Connection was unexpectedly closed"),
284            }
285            match client_reader.recv().await? {
286                TpktRecvResult::Data(x) => {
287                    assert_eq!(x, Vec::from(b"World"));
288                    client_writer.send(x.as_slice()).await?;
289                }
290                _ => assert!(false, "Connection was unexpectedly closed"),
291            }
292        }
293
294        // Drain connections so they can be gracefully shutdown.
295        match server_reader.recv().await? {
296            TpktRecvResult::Data(x) => {
297                assert_eq!(x, Vec::from(b"World"));
298            }
299            _ => assert!(false, "Connection was unexpectedly closed"),
300        }
301        match client_reader.recv().await? {
302            TpktRecvResult::Data(x) => {
303                assert_eq!(x, Vec::from(b""));
304            }
305            _ => assert!(false, "Connection was unexpectedly closed"),
306        }
307
308        drop(server_writer);
309        drop(server_reader);
310
311        match client_reader.recv().await? {
312            TpktRecvResult::Closed => (),
313            _ => assert!(false, "Failed to close connection gracefully."),
314        };
315
316        Ok(())
317    }
318
319    #[tokio::test]
320    #[traced_test]
321    async fn test_txrx_max_byte_data() -> Result<(), anyhow::Error> {
322        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
323        let server = TcpTpktServer::listen(test_address).await?;
324
325        let client_connection = TcpTpktConnection::connect(test_address).await?;
326        let (server_connection, remote_host) = server.accept().await?;
327        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
328
329        drop(server);
330
331        let mut buffer = [0u8; 65531];
332        rand::rng().fill_bytes(&mut buffer[..]);
333
334        let (mut client_reader, mut client_writer) = client_connection.split().await?;
335        let (mut server_reader, mut server_writer) = server_connection.split().await?;
336
337        server_writer.send(&buffer).await?;
338        client_writer.send(b"World").await?;
339
340        for _ in 0..1000 {
341            match server_reader.recv().await? {
342                TpktRecvResult::Data(x) => {
343                    assert_eq!(x, Vec::from(b"World"));
344                    server_writer.send(x.as_slice()).await?;
345                }
346                _ => assert!(false, "Connection was unexpectedly closed"),
347            }
348            match client_reader.recv().await? {
349                TpktRecvResult::Data(x) => {
350                    assert_eq!(x, Vec::from(buffer));
351                    client_writer.send(x.as_slice()).await?;
352                }
353                _ => assert!(false, "Connection was unexpectedly closed"),
354            }
355            match server_reader.recv().await? {
356                TpktRecvResult::Data(x) => {
357                    assert_eq!(x, Vec::from(buffer));
358                    server_writer.send(x.as_slice()).await?;
359                }
360                _ => assert!(false, "Connection was unexpectedly closed"),
361            }
362            match client_reader.recv().await? {
363                TpktRecvResult::Data(x) => {
364                    assert_eq!(x, Vec::from(b"World"));
365                    client_writer.send(x.as_slice()).await?;
366                }
367                _ => assert!(false, "Connection was unexpectedly closed"),
368            }
369        }
370
371        // Drain connections so they can be gracefully shutdown.
372        match server_reader.recv().await? {
373            TpktRecvResult::Data(x) => {
374                assert_eq!(x, Vec::from(b"World"));
375            }
376            _ => assert!(false, "Connection was unexpectedly closed"),
377        }
378        match client_reader.recv().await? {
379            TpktRecvResult::Data(x) => {
380                assert_eq!(x, Vec::from(buffer));
381            }
382            _ => assert!(false, "Connection was unexpectedly closed"),
383        }
384
385        drop(server_writer);
386        drop(server_reader);
387
388        match client_reader.recv().await? {
389            TpktRecvResult::Closed => (),
390            _ => assert!(false, "Failed to close connection gracefully."),
391        };
392
393        Ok(())
394    }
395
396    #[tokio::test]
397    #[traced_test]
398    async fn test_txrx_over_max_byte_data() -> Result<(), anyhow::Error> {
399        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
400        let server = TcpTpktServer::listen(test_address).await?;
401
402        let client_connection = TcpTpktConnection::connect(test_address).await?;
403        let (server_connection, remote_host) = server.accept().await?;
404        assert!(remote_host.to_string().starts_with("127.0.0.1:"));
405
406        drop(server);
407
408        let mut over_buffer = [0u8; 65532];
409        rand::rng().fill_bytes(&mut over_buffer[..]);
410
411        let (mut client_reader, mut client_writer) = client_connection.split().await?;
412        let (mut server_reader, mut server_writer) = server_connection.split().await?;
413
414        match server_writer.send(&over_buffer).await {
415            Ok(_) => assert!(false, "This was expected to fail as it is over the max payload limit"),
416            Err(TpktError::ProtocolError(x)) => assert_eq!(x, "TPKT user data must be less than or equal to 65531 but was 65532"),
417            _ => assert!(false, "Something unexpected happened"),
418        };
419
420        // Try again and lets keep going
421        let mut buffer = [0u8; 65531];
422        rand::rng().fill_bytes(&mut buffer[..]);
423        server_writer.send(&buffer).await?;
424        client_writer.send(b"World").await?;
425
426        for _ in 0..100 {
427            match server_reader.recv().await? {
428                TpktRecvResult::Data(x) => {
429                    assert_eq!(x, Vec::from(b"World"));
430                    server_writer.send(x.as_slice()).await?;
431                }
432                _ => assert!(false, "Connection was unexpectedly closed"),
433            }
434            match client_reader.recv().await? {
435                TpktRecvResult::Data(x) => {
436                    assert_eq!(x, Vec::from(buffer));
437                    client_writer.send(x.as_slice()).await?;
438                }
439                _ => assert!(false, "Connection was unexpectedly closed"),
440            }
441            match server_reader.recv().await? {
442                TpktRecvResult::Data(x) => {
443                    assert_eq!(x, Vec::from(buffer));
444                    server_writer.send(x.as_slice()).await?;
445                }
446                _ => assert!(false, "Connection was unexpectedly closed"),
447            }
448            match client_reader.recv().await? {
449                TpktRecvResult::Data(x) => {
450                    assert_eq!(x, Vec::from(b"World"));
451                    client_writer.send(x.as_slice()).await?;
452                }
453                _ => assert!(false, "Connection was unexpectedly closed"),
454            }
455        }
456
457        // Drain connections so they can be gracefully shutdown.
458        match server_reader.recv().await? {
459            TpktRecvResult::Data(x) => {
460                assert_eq!(x, Vec::from(b"World"));
461            }
462            _ => assert!(false, "Connection was unexpectedly closed"),
463        }
464        match client_reader.recv().await? {
465            TpktRecvResult::Data(x) => {
466                assert_eq!(x, Vec::from(buffer));
467            }
468            _ => assert!(false, "Connection was unexpectedly closed"),
469        }
470
471        drop(server_writer);
472        drop(server_reader);
473
474        match client_reader.recv().await? {
475            TpktRecvResult::Closed => (),
476            _ => assert!(false, "Failed to close connection gracefully."),
477        };
478
479        Ok(())
480    }
481
482    #[tokio::test]
483    #[traced_test]
484    async fn test_no_open_socket() -> Result<(), anyhow::Error> {
485        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
486
487        match TcpTpktConnection::connect(test_address).await {
488            Ok(_) => assert!(false, "This was expected to fail as a socket was not opened."),
489            Err(TpktError::IoError(x)) => assert_eq!(x.kind(), ErrorKind::ConnectionRefused),
490            Err(x) => assert!(false, "Something unexpected happened: {:?}", x),
491        };
492
493        Ok(())
494    }
495}