1mod api;
2mod packet;
3mod parser;
4mod serialiser;
5mod service;
6
7pub use crate::api::*;
8pub use crate::service::*;
9
10#[cfg(test)]
11mod tests {
12 use std::{collections::VecDeque, ops::Range, time::Duration};
13
14 use anyhow::anyhow;
15 use rand::RngCore;
16 use rusty_tpkt::{TcpTpktConnection, TcpTpktReader, TcpTpktServer, TcpTpktWriter, TpktReader, TpktWriter};
17 use tokio::{join, time::timeout};
18 use tracing_test::traced_test;
19
20 use crate::api::{CotpConnection, CotpProtocolInformation, CotpReader, CotpResponder, CotpWriter};
21
22 use super::*;
23
24 #[tokio::test]
25 #[traced_test]
26 async fn it_transfers_data() -> Result<(), anyhow::Error> {
27 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
28
29 let (mut client_read, mut client_writer) = cotp_client.split().await?;
30 let (mut server_read, mut server_writer) = cotp_server.split().await?;
31
32 client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
33 assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "ABCD".as_bytes().to_vec());
34
35 server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
36 assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "EFGH".as_bytes().to_vec());
37
38 Ok(())
39 }
40
41 #[tokio::test]
42 #[traced_test]
43 async fn it_transfers_data_with_tsaps() -> Result<(), anyhow::Error> {
44 let (cotp_client, cotp_server) = create_cotp_connection_pair(Some(vec![1u8, 2u8, 3u8]), Some(vec![4u8, 5u8, 6u8])).await?;
45
46 let (mut client_read, mut client_writer) = cotp_client.split().await?;
47 let (mut server_read, mut server_writer) = cotp_server.split().await?;
48
49 client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
50 assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "ABCD".as_bytes().to_vec());
51
52 server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
53 assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, "EFGH".as_bytes().to_vec());
54
55 Ok(())
56 }
57
58 #[tokio::test]
59 #[traced_test]
60 async fn it_transfers_data_over_multiple_segments() -> Result<(), anyhow::Error> {
61 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
62
63 let (mut client_read, mut client_writer) = cotp_client.split().await?;
64 let (mut server_read, mut server_writer) = cotp_server.split().await?;
65
66 let mut over_buffer = [0u8; 100000];
67 rand::rng().fill_bytes(&mut over_buffer[..]);
68
69 for _ in 0..10 {
70 client_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
71 assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, over_buffer.to_vec());
72
73 server_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
74 assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, over_buffer.to_vec());
75 }
76
77 Ok(())
78 }
79
80 #[tokio::test]
81 #[traced_test]
82 async fn it_flushes_correctly() -> Result<(), anyhow::Error> {
83 let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
84
85 let (mut client_read, mut client_writer) = cotp_client.split().await?;
86 let (mut server_read, mut server_writer) = cotp_server.split().await?;
87
88 let mut over_buffer = [0u8; 1024];
89 let mut data_buffer = Vec::new();
90 for _ in 1..10000 {
91 rand::rng().fill_bytes(&mut over_buffer[..]);
92 data_buffer.extend_from_slice(&over_buffer);
93 }
94
95 match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
96 Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
97 Err(_) => (),
98 }
99 loop {
100 match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::new())).await {
101 Ok(_) => break,
102 Err(_) => (),
103 };
104 match timeout(Duration::from_millis(100), server_read.recv()).await {
105 Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
106 Err(_) => (),
107 }
108 }
109 assert_eq!(server_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, data_buffer.to_vec());
110 match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
111 Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
112 Err(_) => (),
113 }
114 loop {
115 match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::new())).await {
116 Ok(_) => break,
117 Err(_) => (),
118 };
119 match timeout(Duration::from_millis(100), client_read.recv()).await {
120 Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
121 Err(_) => (),
122 }
123 }
124 assert_eq!(client_read.recv().await?.ok_or_else(|| anyhow!("Connection Closed"))?, data_buffer.to_vec());
125
126 Ok(())
127 }
128
129 async fn create_cotp_connection_pair(calling_tsap_id: Option<Vec<u8>>, called_tsap_id: Option<Vec<u8>>) -> Result<(TcpCotpConnection<impl TpktReader, impl TpktWriter>, impl CotpConnection), anyhow::Error> {
130 let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
131
132 let tpkt_listener = TcpTpktServer::listen(test_address).await?;
133 let (tpkt_client, tpkt_server) = join!(TcpTpktConnection::connect(test_address), tpkt_listener.accept());
134
135 let connect_information = CotpProtocolInformation::initiator(calling_tsap_id, called_tsap_id);
136
137 let initiator_connect_information = connect_information.clone();
138 let (cotp_initiator, cotp_acceptor) = join!(async move { TcpCotpConnection::<TcpTpktReader, TcpTpktWriter>::initiate(tpkt_client?, initiator_connect_information).await }, async {
139 let (acceptor, remote) = TcpCotpAcceptor::<TcpTpktReader, TcpTpktWriter>::new(tpkt_server?).await?;
140 assert_eq!(remote, connect_information);
141 acceptor.accept(connect_information).await
142 });
143
144 let cotp_client = cotp_initiator?;
145 let cotp_server = cotp_acceptor?;
146
147 Ok((cotp_client, cotp_server))
148 }
149}