1#![doc = include_str!("../README.md")]
2
3mod api;
4mod packet;
5mod parser;
6mod serialiser;
7mod service;
8
9pub use crate::api::*;
10pub use crate::service::*;
11
12#[cfg(test)]
13mod tests {
14 use std::{collections::VecDeque, ops::Range, time::Duration};
15
16 use anyhow::anyhow;
17 use rand::RngCore;
18 use rusty_tpkt::{TcpTpktConnection, TcpTpktReader, TcpTpktServer, TcpTpktWriter, TpktReader, TpktWriter};
19 use tokio::{join, time::timeout};
20 use tracing_test::traced_test;
21
22 use crate::api::{CotpConnection, CotpProtocolInformation, CotpReader, CotpResponder, CotpWriter};
23
24 use super::*;
25
26 #[tokio::test]
27 #[traced_test]
28 async fn it_transfers_data() -> Result<(), anyhow::Error> {
29 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
30
31 let (mut client_read, mut client_writer) = cotp_client.split().await?;
32 let (mut server_read, mut server_writer) = cotp_server.split().await?;
33
34 client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
35 assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "ABCD".as_bytes().to_vec());
36
37 server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
38 assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "EFGH".as_bytes().to_vec());
39
40 Ok(())
41 }
42
43 #[tokio::test]
44 #[traced_test]
45 async fn it_transfers_data_with_tsaps() -> Result<(), anyhow::Error> {
46 let (cotp_client, cotp_server) = create_cotp_connection_pair(Some(vec![1u8, 2u8, 3u8]), Some(vec![4u8, 5u8, 6u8]), Default::default()).await?;
47
48 let (mut client_read, mut client_writer) = cotp_client.split().await?;
49 let (mut server_read, mut server_writer) = cotp_server.split().await?;
50
51 client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
52 assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "ABCD".as_bytes().to_vec());
53
54 server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
55 assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "EFGH".as_bytes().to_vec());
56
57 Ok(())
58 }
59
60 #[tokio::test]
61 #[traced_test]
62 async fn it_transfers_data_over_multiple_segments() -> Result<(), anyhow::Error> {
63 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
64
65 let (mut client_read, mut client_writer) = cotp_client.split().await?;
66 let (mut server_read, mut server_writer) = cotp_server.split().await?;
67
68 let mut over_buffer = [0u8; 100000];
69 rand::rng().fill_bytes(&mut over_buffer[..]);
70
71 for _ in 0..10 {
72 client_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
73 assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, over_buffer.to_vec());
74
75 server_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
76 assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, over_buffer.to_vec());
77 }
78
79 Ok(())
80 }
81
82 #[tokio::test]
83 #[traced_test]
84 async fn it_fails_on_max_reassembled_payload_exceeded() -> Result<(), anyhow::Error> {
85 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, Default::default()).await?;
86
87 let (_, mut client_writer) = cotp_client.split().await?;
88 let (mut server_read, _) = cotp_server.split().await?;
89
90 let mut over_buffer = [0u8; 1024];
91 let mut data_buffer = Vec::new();
92 for _ in 1..10000 {
93 rand::rng().fill_bytes(&mut over_buffer[..]);
94 data_buffer.extend_from_slice(&over_buffer);
95 }
96
97 match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
98 Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
99 Err(_) => (),
100 }
101 match timeout(Duration::from_millis(100), server_read.recv()).await {
102 Ok(Ok(_)) => assert!(false, "Expected that all the payload was not yet send."),
103 Ok(Err(e)) => {
104 assert_eq!(format!("{e}"), "COTP Protocol Error - Reassembled payload size 1051130 exceeds maximum payload size 1049600");
105 return Ok(());
106 }
107 Err(_) => assert!(false, "Expected to failed on buffer exceeded."),
108 }
109
110 Ok(())
111 }
112
113 #[tokio::test]
114 #[traced_test]
115 async fn it_flushes_correctly() -> Result<(), anyhow::Error> {
116 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None, CotpConnectionParameters { max_reassembled_payload_size: 32 * 1024 * 1024 }).await?;
117
118 let (mut client_read, mut client_writer) = cotp_client.split().await?;
119 let (mut server_read, mut server_writer) = cotp_server.split().await?;
120
121 let mut over_buffer = [0u8; 1024];
122 let mut data_buffer = Vec::new();
123 for _ in 1..10000 {
124 rand::rng().fill_bytes(&mut over_buffer[..]);
125 data_buffer.extend_from_slice(&over_buffer);
126 }
127
128 match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
129 Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
130 Err(_) => (),
131 }
132 loop {
133 match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::new())).await {
134 Ok(_) => break,
135 Err(_) => (),
136 };
137 match timeout(Duration::from_millis(100), server_read.recv()).await {
138 Ok(Ok(_)) => assert!(false, "Expected that all the payload was not yet send."),
139 Ok(Err(e)) => return Err(e)?,
140 Err(_) => (),
141 }
142 }
143 assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, data_buffer.to_vec());
144 match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
145 Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
146 Err(_) => (),
147 }
148 loop {
149 match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::new())).await {
150 Ok(_) => break,
151 Err(_) => (),
152 };
153 match timeout(Duration::from_millis(100), client_read.recv()).await {
154 Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
155 Err(_) => (),
156 }
157 }
158 assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, data_buffer.to_vec());
159
160 Ok(())
161 }
162
163 async fn create_cotp_connection_pair(
164 calling_tsap_id: Option<Vec<u8>>,
165 called_tsap_id: Option<Vec<u8>>,
166 connection_parameters: CotpConnectionParameters,
167 ) -> Result<(RustyCotpConnection<impl TpktReader, impl TpktWriter>, impl CotpConnection), anyhow::Error> {
168 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
169
170 let tpkt_listener = TcpTpktServer::listen(test_address).await?;
171 let (tpkt_client, tpkt_server) = join!(TcpTpktConnection::connect(test_address), tpkt_listener.accept());
172
173 let connect_information = CotpProtocolInformation::initiator(calling_tsap_id, called_tsap_id);
174
175 let initiator_connection_parameters = connection_parameters.clone();
176 let initiator_connect_information = connect_information.clone();
177 let (cotp_initiator, cotp_acceptor) = join!(async move { RustyCotpConnection::<TcpTpktReader, TcpTpktWriter>::initiate(tpkt_client?, initiator_connect_information, initiator_connection_parameters).await }, async move {
178 let (acceptor, remote) = RustyCotpAcceptor::<TcpTpktReader, TcpTpktWriter>::new(tpkt_server?, connection_parameters).await?;
179 assert_eq!(remote, connect_information);
180 acceptor.accept(connect_information).await
181 });
182
183 let cotp_client = cotp_initiator?;
184 let cotp_server = cotp_acceptor?;
185
186 Ok((cotp_client, cotp_server))
187 }
188}