1mod api;
2mod packet;
3mod parser;
4mod serialiser;
5mod service;
6
7pub use crate::api::*;
8pub use crate::service::*;
9
10#[cfg(test)]
11mod tests {
12 use std::{ops::Range, time::Duration};
13
14 use rand::RngCore;
15 use rusty_tpkt::{TcpTpktConnection, TcpTpktReader, TcpTpktServer, TcpTpktWriter, TpktReader, TpktWriter};
16 use tokio::{join, time::timeout};
17 use tracing_test::traced_test;
18
19 use crate::api::{CotpConnectInformation, CotpConnection, CotpReader, CotpResponder, CotpWriter};
20
21 use super::*;
22
23 #[tokio::test]
24 #[traced_test]
25 async fn it_transfers_data() -> Result<(), anyhow::Error> {
26 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
27
28 let (mut client_read, mut client_writer) = cotp_client.split().await?;
29 let (mut server_read, mut server_writer) = cotp_server.split().await?;
30
31 client_writer.send("ABCD".as_bytes()).await?;
32 match server_read.recv().await? {
33 api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
34 api::CotpRecvResult::Data(items) => assert_eq!(items, "ABCD".as_bytes().to_vec()),
35 }
36
37 server_writer.send("EFGH".as_bytes()).await?;
38 match client_read.recv().await? {
39 api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
40 api::CotpRecvResult::Data(items) => assert_eq!(items, "EFGH".as_bytes().to_vec()),
41 }
42
43 Ok(())
44 }
45
46 #[tokio::test]
47 #[traced_test]
48 async fn it_transfers_data_with_tsaps() -> Result<(), anyhow::Error> {
49 let (cotp_client, cotp_server) = create_cotp_connection_pair(Some(vec![1u8, 2u8, 3u8]), Some(vec![4u8, 5u8, 6u8])).await?;
50
51 let (mut client_read, mut client_writer) = cotp_client.split().await?;
52 let (mut server_read, mut server_writer) = cotp_server.split().await?;
53
54 client_writer.send("ABCD".as_bytes()).await?;
55 match server_read.recv().await? {
56 api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
57 api::CotpRecvResult::Data(items) => assert_eq!(items, "ABCD".as_bytes().to_vec()),
58 }
59
60 server_writer.send("EFGH".as_bytes()).await?;
61 match client_read.recv().await? {
62 api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
63 api::CotpRecvResult::Data(items) => assert_eq!(items, "EFGH".as_bytes().to_vec()),
64 }
65
66 Ok(())
67 }
68
69 #[tokio::test]
70 #[traced_test]
71 async fn it_transfers_data_over_multiple_segments() -> Result<(), anyhow::Error> {
72 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
73
74 let (mut client_read, mut client_writer) = cotp_client.split().await?;
75 let (mut server_read, mut server_writer) = cotp_server.split().await?;
76
77 let mut over_buffer = [0u8; 100000];
78 rand::rng().fill_bytes(&mut over_buffer[..]);
79
80 for _ in 0..10 {
81 client_writer.send(over_buffer.as_slice()).await?;
82 match server_read.recv().await? {
83 api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
84 api::CotpRecvResult::Data(items) => assert_eq!(items, over_buffer.to_vec()),
85 }
86
87 server_writer.send(over_buffer.as_slice()).await?;
88 match client_read.recv().await? {
89 api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
90 api::CotpRecvResult::Data(items) => assert_eq!(items, over_buffer.to_vec()),
91 }
92 }
93
94 Ok(())
95 }
96
97 #[tokio::test]
98 #[traced_test]
99 async fn it_flushes_correctly() -> Result<(), anyhow::Error> {
100 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
101
102 let (mut client_read, mut client_writer) = cotp_client.split().await?;
103 let (mut server_read, mut server_writer) = cotp_server.split().await?;
104
105 let mut over_buffer = [0u8; 1024];
106 let mut data_buffer = Vec::new();
107 for _ in 1..10000 {
108 rand::rng().fill_bytes(&mut over_buffer[..]);
109 data_buffer.extend_from_slice(&over_buffer);
110 }
111
112 match timeout(Duration::from_millis(100), client_writer.send(data_buffer.as_slice())).await {
113 Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
114 Err(_) => (),
115 }
116 loop {
117 match timeout(Duration::from_millis(100), client_writer.continue_send()).await {
118 Ok(_) => break,
119 Err(_) => (),
120 };
121 match timeout(Duration::from_millis(100), server_read.recv()).await {
122 Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
123 Err(_) => (),
124 }
125 }
126 match server_read.recv().await? {
127 api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
128 api::CotpRecvResult::Data(items) => assert_eq!(items, data_buffer.to_vec()),
129 }
130
131 match timeout(Duration::from_millis(100), server_writer.send(data_buffer.as_slice())).await {
132 Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
133 Err(_) => (),
134 }
135 loop {
136 match timeout(Duration::from_millis(100), server_writer.continue_send()).await {
137 Ok(_) => break,
138 Err(_) => (),
139 };
140 match timeout(Duration::from_millis(100), client_read.recv()).await {
141 Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
142 Err(_) => (),
143 }
144 }
145 match client_read.recv().await? {
146 api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
147 api::CotpRecvResult::Data(items) => assert_eq!(items, data_buffer.to_vec()),
148 }
149
150 Ok(())
151 }
152
153 async fn create_cotp_connection_pair(calling_tsap_id: Option<Vec<u8>>, called_tsap_id: Option<Vec<u8>>) -> Result<(TcpCotpConnection<impl TpktReader, impl TpktWriter>, impl CotpConnection), anyhow::Error> {
154 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
155
156 let tpkt_listener = TcpTpktServer::listen(test_address).await?;
157 let (tpkt_client, tpkt_server) = join!(TcpTpktConnection::connect(test_address), tpkt_listener.accept());
158
159 let connect_information = CotpConnectInformation {
160 calling_tsap_id: calling_tsap_id.clone(),
161 called_tsap_id: called_tsap_id.clone(),
162 ..Default::default()
163 };
164 let accept_information = CotpAcceptInformation { ..Default::default() };
165
166 let (cotp_initiator, cotp_acceptor) = join!(async { TcpCotpConnection::<TcpTpktReader, TcpTpktWriter>::initiate(tpkt_client?, connect_information.clone(),).await }, async {
167 let (acceptor, remote) = TcpCotpAcceptor::<TcpTpktReader, TcpTpktWriter>::new(tpkt_server?.0).await?;
168 assert_eq!(remote, connect_information);
169 acceptor.accept(accept_information).await
170 });
171
172 let cotp_client = cotp_initiator?;
173 let cotp_server = cotp_acceptor?;
174
175 Ok((cotp_client, cotp_server))
176 }
177}