Skip to main content

rusty_cotp/
lib.rs

1mod api;
2mod packet;
3mod parser;
4mod serialiser;
5mod service;
6
7pub use crate::api::*;
8pub use crate::service::*;
9
10#[cfg(test)]
11mod tests {
12    use std::{collections::VecDeque, ops::Range, time::Duration};
13
14    use anyhow::anyhow;
15    use rand::RngCore;
16    use rusty_tpkt::{TcpTpktConnection, TcpTpktReader, TcpTpktServer, TcpTpktWriter, TpktReader, TpktWriter};
17    use tokio::{join, time::timeout};
18    use tracing_test::traced_test;
19
20    use crate::api::{CotpConnection, CotpProtocolInformation, CotpReader, CotpResponder, CotpWriter};
21
22    use super::*;
23
24    #[tokio::test]
25    #[traced_test]
26    async fn it_transfers_data() -> Result<(), anyhow::Error> {
27        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
28
29        let (mut client_read, mut client_writer) = cotp_client.split().await?;
30        let (mut server_read, mut server_writer) = cotp_server.split().await?;
31
32        client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
33        assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "ABCD".as_bytes().to_vec());
34
35        server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
36        assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "EFGH".as_bytes().to_vec());
37
38        Ok(())
39    }
40
41    #[tokio::test]
42    #[traced_test]
43    async fn it_transfers_data_with_tsaps() -> Result<(), anyhow::Error> {
44        let (cotp_client, cotp_server) = create_cotp_connection_pair(Some(vec![1u8, 2u8, 3u8]), Some(vec![4u8, 5u8, 6u8]), Default::default()).await?;
45
46        let (mut client_read, mut client_writer) = cotp_client.split().await?;
47        let (mut server_read, mut server_writer) = cotp_server.split().await?;
48
49        client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
50        assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "ABCD".as_bytes().to_vec());
51
52        server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
53        assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "EFGH".as_bytes().to_vec());
54
55        Ok(())
56    }
57
58    #[tokio::test]
59    #[traced_test]
60    async fn it_transfers_data_over_multiple_segments() -> Result<(), anyhow::Error> {
61        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
62
63        let (mut client_read, mut client_writer) = cotp_client.split().await?;
64        let (mut server_read, mut server_writer) = cotp_server.split().await?;
65
66        let mut over_buffer = [0u8; 100000];
67        rand::rng().fill_bytes(&mut over_buffer[..]);
68
69        for _ in 0..10 {
70            client_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
71            assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, over_buffer.to_vec());
72
73            server_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
74            assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, over_buffer.to_vec());
75        }
76
77        Ok(())
78    }
79
80    
81    #[tokio::test]
82    #[traced_test]
83    async fn it_fails_on_max_reassembled_payload_exceeded() -> Result<(), anyhow::Error> {
84        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
85
86        let (_, mut client_writer) = cotp_client.split().await?;
87        let (mut server_read, _) = cotp_server.split().await?;
88
89        let mut over_buffer = [0u8; 1024];
90        let mut data_buffer = Vec::new();
91        for _ in 1..10000 {
92            rand::rng().fill_bytes(&mut over_buffer[..]);
93            data_buffer.extend_from_slice(&over_buffer);
94        }
95
96        match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
97            Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
98            Err(_) => (),
99        }
100        match timeout(Duration::from_millis(100), server_read.recv()).await {
101            Ok(Ok(_)) => assert!(false, "Expected that all the payload was not yet send."),
102            Ok(Err(e)) => {
103                assert_eq!(format!("{e}"), "COTP Protocol Error - Reassembled payload size 1051130 exceeds maximum payload size 1049600");
104                return Ok(())
105            },
106            Err(_) => assert!(false, "Expected to failed on buffer exceeded."),
107        }
108
109        Ok(())
110    }
111
112    #[tokio::test]
113    #[traced_test]
114    async fn it_flushes_correctly() -> Result<(), anyhow::Error> {
115        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, CotpConnectionParameters { max_reassembled_payload_size: 32*1024*1024 }).await?;
116
117        let (mut client_read, mut client_writer) = cotp_client.split().await?;
118        let (mut server_read, mut server_writer) = cotp_server.split().await?;
119
120        let mut over_buffer = [0u8; 1024];
121        let mut data_buffer = Vec::new();
122        for _ in 1..10000 {
123            rand::rng().fill_bytes(&mut over_buffer[..]);
124            data_buffer.extend_from_slice(&over_buffer);
125        }
126
127        match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
128            Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
129            Err(_) => (),
130        }
131        loop {
132            match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::new())).await {
133                Ok(_) => break,
134                Err(_) => (),
135            };
136            match timeout(Duration::from_millis(100), server_read.recv()).await {
137                Ok(Ok(_)) => assert!(false, "Expected that all the payload was not yet send."),
138                Ok(Err(e)) => return Err(e)?,
139                Err(_) => (),
140            }
141        }
142        assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, data_buffer.to_vec());
143        match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
144            Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
145            Err(_) => (),
146        }
147        loop {
148            match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::new())).await {
149                Ok(_) => break,
150                Err(_) => (),
151            };
152            match timeout(Duration::from_millis(100), client_read.recv()).await {
153                Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
154                Err(_) => (),
155            }
156        }
157        assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, data_buffer.to_vec());
158
159        Ok(())
160    }
161
162    async fn create_cotp_connection_pair(calling_tsap_id: Option<Vec<u8>>, called_tsap_id: Option<Vec<u8>>, connection_parameters: CotpConnectionParameters) -> Result<(RustyCotpConnection<impl TpktReader, impl TpktWriter>, impl CotpConnection), anyhow::Error> {
163        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
164
165        let tpkt_listener = TcpTpktServer::listen(test_address).await?;
166        let (tpkt_client, tpkt_server) = join!(TcpTpktConnection::connect(test_address), tpkt_listener.accept());
167
168        let connect_information = CotpProtocolInformation::initiator(calling_tsap_id, called_tsap_id);
169
170        let initiator_connection_parameters = connection_parameters.clone();
171        let initiator_connect_information = connect_information.clone();
172        let (cotp_initiator, cotp_acceptor) = join!(async move { RustyCotpConnection::<TcpTpktReader, TcpTpktWriter>::initiate(tpkt_client?, initiator_connect_information, initiator_connection_parameters).await }, async move {
173            let (acceptor, remote) = RustyCotpAcceptor::<TcpTpktReader, TcpTpktWriter>::new(tpkt_server?, connection_parameters).await?;
174            assert_eq!(remote, connect_information);
175            acceptor.accept(connect_information).await
176        });
177
178        let cotp_client = cotp_initiator?;
179        let cotp_server = cotp_acceptor?;
180
181        Ok((cotp_client, cotp_server))
182    }
183}