Skip to main content

rusty_cotp/
lib.rs

1#![doc = include_str!("../README.md")]
2
3mod api;
4mod packet;
5mod parser;
6mod serialiser;
7mod service;
8
9pub use crate::api::*;
10pub use crate::service::*;
11
12#[cfg(test)]
13mod tests {
14    use std::{collections::VecDeque, ops::Range, time::Duration};
15
16    use anyhow::anyhow;
17    use rand::RngCore;
18    use rusty_tpkt::{TcpTpktConnection, TcpTpktReader, TcpTpktServer, TcpTpktWriter, TpktReader, TpktWriter};
19    use tokio::{join, time::timeout};
20    use tracing_test::traced_test;
21
22    use crate::api::{CotpConnection, CotpProtocolInformation, CotpReader, CotpResponder, CotpWriter};
23
24    use super::*;
25
26    #[tokio::test]
27    #[traced_test]
28    async fn it_transfers_data() -> Result<(), anyhow::Error> {
29        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
30
31        let (mut client_read, mut client_writer) = cotp_client.split().await?;
32        let (mut server_read, mut server_writer) = cotp_server.split().await?;
33
34        client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
35        assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "ABCD".as_bytes().to_vec());
36
37        server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
38        assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "EFGH".as_bytes().to_vec());
39
40        Ok(())
41    }
42
43    #[tokio::test]
44    #[traced_test]
45    async fn it_transfers_data_with_tsaps() -> Result<(), anyhow::Error> {
46        let (cotp_client, cotp_server) = create_cotp_connection_pair(Some(vec![1u8, 2u8, 3u8]), Some(vec![4u8, 5u8, 6u8]), Default::default()).await?;
47
48        let (mut client_read, mut client_writer) = cotp_client.split().await?;
49        let (mut server_read, mut server_writer) = cotp_server.split().await?;
50
51        client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
52        assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "ABCD".as_bytes().to_vec());
53
54        server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
55        assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "EFGH".as_bytes().to_vec());
56
57        Ok(())
58    }
59
60    #[tokio::test]
61    #[traced_test]
62    async fn it_transfers_data_over_multiple_segments() -> Result<(), anyhow::Error> {
63        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
64
65        let (mut client_read, mut client_writer) = cotp_client.split().await?;
66        let (mut server_read, mut server_writer) = cotp_server.split().await?;
67
68        let mut over_buffer = [0u8; 100000];
69        rand::rng().fill_bytes(&mut over_buffer[..]);
70
71        for _ in 0..10 {
72            client_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
73            assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, over_buffer.to_vec());
74
75            server_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
76            assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, over_buffer.to_vec());
77        }
78
79        Ok(())
80    }
81
82    #[tokio::test]
83    #[traced_test]
84    async fn it_fails_on_max_reassembled_payload_exceeded() -> Result<(), anyhow::Error> {
85        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
86
87        let (_, mut client_writer) = cotp_client.split().await?;
88        let (mut server_read, _) = cotp_server.split().await?;
89
90        let mut over_buffer = [0u8; 1024];
91        let mut data_buffer = Vec::new();
92        for _ in 1..10000 {
93            rand::rng().fill_bytes(&mut over_buffer[..]);
94            data_buffer.extend_from_slice(&over_buffer);
95        }
96
97        match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
98            Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
99            Err(_) => (),
100        }
101        match timeout(Duration::from_millis(100), server_read.recv()).await {
102            Ok(Ok(_)) => assert!(false, "Expected that all the payload was not yet send."),
103            Ok(Err(e)) => {
104                assert_eq!(format!("{e}"), "COTP Protocol Error - Reassembled payload size 1051130 exceeds maximum payload size 1049600");
105                return Ok(());
106            }
107            Err(_) => assert!(false, "Expected to failed on buffer exceeded."),
108        }
109
110        Ok(())
111    }
112
113    #[tokio::test]
114    #[traced_test]
115    async fn it_flushes_correctly() -> Result<(), anyhow::Error> {
116        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, CotpConnectionParameters { max_reassembled_payload_size: 32 * 1024 * 1024 }).await?;
117
118        let (mut client_read, mut client_writer) = cotp_client.split().await?;
119        let (mut server_read, mut server_writer) = cotp_server.split().await?;
120
121        let mut over_buffer = [0u8; 1024];
122        let mut data_buffer = Vec::new();
123        for _ in 1..10000 {
124            rand::rng().fill_bytes(&mut over_buffer[..]);
125            data_buffer.extend_from_slice(&over_buffer);
126        }
127
128        match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
129            Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
130            Err(_) => (),
131        }
132        loop {
133            match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::new())).await {
134                Ok(_) => break,
135                Err(_) => (),
136            };
137            match timeout(Duration::from_millis(100), server_read.recv()).await {
138                Ok(Ok(_)) => assert!(false, "Expected that all the payload was not yet send."),
139                Ok(Err(e)) => return Err(e)?,
140                Err(_) => (),
141            }
142        }
143        assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, data_buffer.to_vec());
144        match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
145            Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
146            Err(_) => (),
147        }
148        loop {
149            match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::new())).await {
150                Ok(_) => break,
151                Err(_) => (),
152            };
153            match timeout(Duration::from_millis(100), client_read.recv()).await {
154                Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
155                Err(_) => (),
156            }
157        }
158        assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, data_buffer.to_vec());
159
160        Ok(())
161    }
162
163    async fn create_cotp_connection_pair(
164        calling_tsap_id: Option<Vec<u8>>,
165        called_tsap_id: Option<Vec<u8>>,
166        connection_parameters: CotpConnectionParameters,
167    ) -> Result<(RustyCotpConnection<impl TpktReader, impl TpktWriter>, impl CotpConnection), anyhow::Error> {
168        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
169
170        let tpkt_listener = TcpTpktServer::listen(test_address).await?;
171        let (tpkt_client, tpkt_server) = join!(TcpTpktConnection::connect(test_address), tpkt_listener.accept());
172
173        let connect_information = CotpProtocolInformation::initiator(calling_tsap_id, called_tsap_id);
174
175        let initiator_connection_parameters = connection_parameters.clone();
176        let initiator_connect_information = connect_information.clone();
177        let (cotp_initiator, cotp_acceptor) = join!(async move { RustyCotpConnection::<TcpTpktReader, TcpTpktWriter>::initiate(tpkt_client?, initiator_connect_information, initiator_connection_parameters).await }, async move {
178            let (acceptor, remote) = RustyCotpResponder::<TcpTpktReader, TcpTpktWriter>::new(tpkt_server?, connection_parameters).await?;
179            assert_eq!(remote, connect_information);
180            acceptor.accept(connect_information).await
181        });
182
183        let cotp_client = cotp_initiator?;
184        let cotp_server = cotp_acceptor?;
185
186        Ok((cotp_client, cotp_server))
187    }
188}