Skip to main content

rusty_tpkt/
lib.rs

1#![doc = include_str!("../README.md")]
2
3mod api;
4mod parser;
5mod serialiser;
6mod service;
7
8pub use crate::api::*;
9pub use crate::service::*;
10
11#[cfg(test)]
12mod tests {
13
14    use std::{
15        any::Any,
16        collections::VecDeque,
17        io::ErrorKind,
18        ops::{Deref, Range},
19    };
20
21    use anyhow::anyhow;
22    use rand::RngCore;
23    use tracing_test::traced_test;
24
25    use super::*;
26
27    #[tokio::test(flavor = "multi_thread")]
28    #[traced_test]
29    async fn test_txrx_sequential_payloads() -> Result<(), anyhow::Error> {
30        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
31        let server = TcpTpktServer::listen(test_address).await?;
32
33        // This proves we can drop the connection after the split takes place.
34        let (mut client_reader, mut client_writer, mut server_reader, mut server_writer) = {
35            let client_connection = TcpTpktConnection::connect(test_address).await?;
36            let server_connection = server.accept().await?;
37            match (server_connection.get_protocol_infomation_list().get(0).ok_or_else(|| anyhow!("Test Failed"))?.deref() as &dyn Any).downcast_ref::<TcpTpktProtocolInformation>() {
38                Some(info) => assert!(info.remote_address.to_string().starts_with("127.0.0.1:")),
39                None => return Err(anyhow!("Test Failed")),
40            };
41
42            let (client_reader, client_writer) = client_connection.split().await?;
43            let (server_reader, server_writer) = server_connection.split().await?;
44            (client_reader, client_writer, server_reader, server_writer)
45        };
46
47        server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
48        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
49
50        drop(server);
51
52        for _ in 0..1000 {
53            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
54            assert_eq!(data, Vec::from(b"Hello"));
55            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
56
57            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
58            assert_eq!(data, Vec::from(b"World"));
59            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
60
61            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
62            assert_eq!(data, Vec::from(b"World"));
63            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
64
65            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
66            assert_eq!(data, Vec::from(b"Hello"));
67            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
68        }
69
70        // Drain connections so they can be gracefully shutdown.
71        assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
72        assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"Hello"));
73
74        drop(server_writer);
75        drop(server_reader);
76
77        match client_reader.recv().await? {
78            None => (),
79            _ => return Err(anyhow!("Failed to close connection gracefully.")),
80        };
81
82        Ok(())
83    }
84
85    #[tokio::test]
86    #[traced_test]
87    async fn test_txrx_concurrent_payloads() -> Result<(), anyhow::Error> {
88        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
89        let server = TcpTpktServer::listen(test_address).await?;
90
91        let client_connection = TcpTpktConnection::connect(test_address).await?;
92        let server_connection = server.accept().await?;
93
94        match (client_connection.get_protocol_infomation_list().get(0).ok_or_else(|| anyhow!("Test Failed"))?.deref() as &dyn Any).downcast_ref::<TcpTpktProtocolInformation>() {
95            Some(info) => assert!(info.remote_address.to_string().starts_with("127.0.0.1:")),
96            None => return Err(anyhow!("Test Failed")),
97        };
98        match (server_connection.get_protocol_infomation_list().get(0).ok_or_else(|| anyhow!("Test Failed"))?.deref() as &dyn Any).downcast_ref::<TcpTpktProtocolInformation>() {
99            Some(info) => assert!(info.remote_address.to_string().starts_with("127.0.0.1:")),
100            None => return Err(anyhow!("Test Failed")),
101        };
102
103        let (mut client_reader, mut client_writer) = client_connection.split().await?;
104        let (mut server_reader, mut server_writer) = server_connection.split().await?;
105
106        server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
107        server_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
108
109        drop(server);
110
111        for _ in 0..1000 {
112            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
113            assert_eq!(data, Vec::from(b"Hello"));
114            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
115
116            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
117            assert_eq!(data, Vec::from(b"World"));
118            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
119
120            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
121            assert_eq!(data, Vec::from(b"Hello"));
122            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
123
124            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
125            assert_eq!(data, Vec::from(b"World"));
126            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
127        }
128
129        assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"Hello"));
130        assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
131
132        drop(client_writer);
133        drop(client_reader);
134
135        // Drain connections so they can be gracefully shutdown.
136        match server_reader.recv().await? {
137            None => (),
138            _ => return Err(anyhow!("Failed to close connection gracefully.")),
139        };
140
141        Ok(())
142    }
143
144    #[tokio::test]
145    #[traced_test]
146    async fn test_txrx_sequential_ungraceful_shutdown() -> Result<(), anyhow::Error> {
147        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
148        let server = TcpTpktServer::listen(test_address).await?;
149
150        let client_connection = TcpTpktConnection::connect(test_address).await?;
151        let server_connection = server.accept().await?;
152
153        let (mut client_reader, mut client_writer) = client_connection.split().await?;
154        let (mut server_reader, mut server_writer) = server_connection.split().await?;
155
156        server_writer.send(&mut VecDeque::from_iter(vec![b"Hello".to_vec()])).await?;
157        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
158
159        drop(server);
160
161        for _ in 0..1000 {
162            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
163            assert_eq!(data, Vec::from(b"World"));
164            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
165
166            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
167            assert_eq!(data, Vec::from(b"Hello"));
168            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
169
170            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
171            assert_eq!(data, Vec::from(b"Hello"));
172            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
173
174            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
175            assert_eq!(data, Vec::from(b"World"));
176            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
177        }
178
179        // Drain connections so they can be gracefully shutdown.
180        assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
181        assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"Hello"));
182
183        drop(server_writer);
184        drop(server_reader);
185
186        match client_reader.recv().await? {
187            None => (),
188            _ => return Err(anyhow!("Failed to close connection gracefully.")),
189        };
190
191        Ok(())
192    }
193
194    #[tokio::test]
195    #[traced_test]
196    async fn test_txrx_zero_byte_data() -> Result<(), anyhow::Error> {
197        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
198        let server = TcpTpktServer::listen(test_address).await?;
199
200        let client_connection = TcpTpktConnection::connect(test_address).await?;
201        let server_connection = server.accept().await?;
202
203        drop(server);
204
205        let (mut client_reader, mut client_writer) = client_connection.split().await?;
206        let (mut server_reader, mut server_writer) = server_connection.split().await?;
207
208        server_writer.send(&mut VecDeque::from_iter(vec![b"".to_vec()])).await?;
209        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
210
211        for _ in 0..1000 {
212            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
213            assert_eq!(data, Vec::from(b"World"));
214            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
215
216            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
217            assert_eq!(data, Vec::from(b""));
218            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
219
220            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
221            assert_eq!(data, Vec::from(b""));
222            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
223
224            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
225            assert_eq!(data, Vec::from(b"World"));
226            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
227        }
228
229        // Drain connections so they can be gracefully shutdown.
230        assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
231        assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b""));
232
233        drop(server_writer);
234        drop(server_reader);
235
236        match client_reader.recv().await? {
237            None => (),
238            _ => return Err(anyhow!("Failed to close connection gracefully.")),
239        };
240
241        Ok(())
242    }
243
244    #[tokio::test]
245    #[traced_test]
246    async fn test_txrx_max_byte_data() -> Result<(), anyhow::Error> {
247        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
248        let server = TcpTpktServer::listen(test_address).await?;
249
250        let client_connection = TcpTpktConnection::connect(test_address).await?;
251        let server_connection = server.accept().await?;
252
253        drop(server);
254
255        let mut buffer = [0u8; 65531];
256        rand::rng().fill_bytes(&mut buffer[..]);
257
258        let (mut client_reader, mut client_writer) = client_connection.split().await?;
259        let (mut server_reader, mut server_writer) = server_connection.split().await?;
260
261        server_writer.send(&mut VecDeque::from_iter(vec![buffer.to_vec()])).await?;
262        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
263
264        for _ in 0..1000 {
265            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
266            assert_eq!(data, Vec::from(b"World"));
267            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
268
269            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
270            assert_eq!(data, Vec::from(buffer));
271            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
272
273            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
274            assert_eq!(data, Vec::from(buffer));
275            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
276
277            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
278            assert_eq!(data, Vec::from(b"World"));
279            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
280        }
281
282        // Drain connections so they can be gracefully shutdown.
283        assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
284        assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(buffer));
285
286        drop(server_writer);
287        drop(server_reader);
288
289        match client_reader.recv().await? {
290            None => (),
291            _ => return Err(anyhow!("Failed to close connection gracefully.")),
292        };
293
294        Ok(())
295    }
296
297    #[tokio::test]
298    #[traced_test]
299    async fn test_txrx_over_max_byte_data() -> Result<(), anyhow::Error> {
300        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
301        let server = TcpTpktServer::listen(test_address).await?;
302
303        let client_connection = TcpTpktConnection::connect(test_address).await?;
304        let server_connection = server.accept().await?;
305
306        drop(server);
307
308        let mut over_buffer = [0u8; 65532];
309        rand::rng().fill_bytes(&mut over_buffer[..]);
310
311        let (mut client_reader, mut client_writer) = client_connection.split().await?;
312        let (mut server_reader, mut server_writer) = server_connection.split().await?;
313
314        match server_writer.send(&mut VecDeque::from_iter(vec![over_buffer.to_vec()])).await {
315            Ok(_) => assert!(false, "This was expected to fail as it is over the max payload limit"),
316            Err(TpktError::ProtocolError(x)) => assert_eq!(x, "TPKT user data must be less than or equal to 65531 but was 65532"),
317            _ => return Err(anyhow!("Something unexpected happened")),
318        };
319
320        // Try again and lets keep going
321        let mut buffer = [0u8; 65531];
322        rand::rng().fill_bytes(&mut buffer[..]);
323        server_writer.send(&mut VecDeque::from_iter(vec![buffer.to_vec()])).await?;
324        client_writer.send(&mut VecDeque::from_iter(vec![b"World".to_vec()])).await?;
325
326        for _ in 0..100 {
327            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
328            assert_eq!(data, Vec::from(b"World"));
329            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
330
331            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
332            assert_eq!(data, Vec::from(buffer));
333            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
334
335            let data = server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
336            assert_eq!(data, Vec::from(buffer));
337            server_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
338
339            let data = client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?;
340            assert_eq!(data, Vec::from(b"World"));
341            client_writer.send(&mut VecDeque::from(vec![data.to_vec()])).await?;
342        }
343
344        // Drain connections so they can be gracefully shutdown.
345        assert_eq!(server_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(b"World"));
346        assert_eq!(client_reader.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, Vec::from(buffer));
347
348        drop(server_writer);
349        drop(server_reader);
350
351        match client_reader.recv().await? {
352            None => (),
353            _ => return Err(anyhow!("Failed to close connection gracefully.")),
354        };
355
356        Ok(())
357    }
358
359    #[tokio::test]
360    #[traced_test]
361    async fn test_no_open_socket() -> Result<(), anyhow::Error> {
362        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
363
364        match TcpTpktConnection::connect(test_address).await {
365            Ok(_) => assert!(false, "This was expected to fail as a socket was not opened."),
366            Err(TpktError::IoError(x)) => assert_eq!(x.kind(), ErrorKind::ConnectionRefused),
367            Err(x) => return Err(anyhow!("Something unexpected happened: {:?}", x)),
368        };
369
370        Ok(())
371    }
372}