Skip to main content

rusty_cotp/
lib.rs

1mod api;
2mod packet;
3mod parser;
4mod serialiser;
5mod service;
6
7pub use crate::api::*;
8pub use crate::service::*;
9
10#[cfg(test)]
11mod tests {
12    use std::{collections::VecDeque, ops::Range, time::Duration};
13
14    use rand::RngCore;
15    use rusty_tpkt::{TcpTpktConnection, TcpTpktReader, TcpTpktServer, TcpTpktWriter, TpktReader, TpktWriter};
16    use tokio::{join, time::timeout};
17    use tracing_test::traced_test;
18
19    use crate::api::{CotpConnection, CotpProtocolInformation, CotpReader, CotpResponder, CotpWriter};
20
21    use super::*;
22
23    #[tokio::test]
24    #[traced_test]
25    async fn it_transfers_data() -> Result<(), anyhow::Error> {
26        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
27
28        let (mut client_read, mut client_writer) = cotp_client.split().await?;
29        let (mut server_read, mut server_writer) = cotp_server.split().await?;
30
31        client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
32        match server_read.recv().await? {
33            api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
34            api::CotpRecvResult::Data(items) => assert_eq!(items, "ABCD".as_bytes().to_vec()),
35        }
36
37        server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
38        match client_read.recv().await? {
39            api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
40            api::CotpRecvResult::Data(items) => assert_eq!(items, "EFGH".as_bytes().to_vec()),
41        }
42
43        Ok(())
44    }
45
46    #[tokio::test]
47    #[traced_test]
48    async fn it_transfers_data_with_tsaps() -> Result<(), anyhow::Error> {
49        let (cotp_client, cotp_server) = create_cotp_connection_pair(Some(vec![1u8, 2u8, 3u8]), Some(vec![4u8, 5u8, 6u8])).await?;
50
51        let (mut client_read, mut client_writer) = cotp_client.split().await?;
52        let (mut server_read, mut server_writer) = cotp_server.split().await?;
53
54        client_writer.send(&mut VecDeque::from(vec![b"ABCD".to_vec()])).await?;
55        match server_read.recv().await? {
56            api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
57            api::CotpRecvResult::Data(items) => assert_eq!(items, "ABCD".as_bytes().to_vec()),
58        }
59
60        server_writer.send(&mut VecDeque::from(vec![b"EFGH".to_vec()])).await?;
61        match client_read.recv().await? {
62            api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
63            api::CotpRecvResult::Data(items) => assert_eq!(items, "EFGH".as_bytes().to_vec()),
64        }
65
66        Ok(())
67    }
68
69    #[tokio::test]
70    #[traced_test]
71    async fn it_transfers_data_over_multiple_segments() -> Result<(), anyhow::Error> {
72        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
73
74        let (mut client_read, mut client_writer) = cotp_client.split().await?;
75        let (mut server_read, mut server_writer) = cotp_server.split().await?;
76
77        let mut over_buffer = [0u8; 100000];
78        rand::rng().fill_bytes(&mut over_buffer[..]);
79
80        for _ in 0..10 {
81            client_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
82            match server_read.recv().await? {
83                api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
84                api::CotpRecvResult::Data(items) => assert_eq!(items, over_buffer.to_vec()),
85            }
86
87            server_writer.send(&mut VecDeque::from(vec![over_buffer.to_vec()])).await?;
88            match client_read.recv().await? {
89                api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
90                api::CotpRecvResult::Data(items) => assert_eq!(items, over_buffer.to_vec()),
91            }
92        }
93
94        Ok(())
95    }
96
97    #[tokio::test]
98    #[traced_test]
99    async fn it_flushes_correctly() -> Result<(), anyhow::Error> {
100        let (cotp_client, cotp_server) = create_cotp_connection_pair(None, None).await?;
101
102        let (mut client_read, mut client_writer) = cotp_client.split().await?;
103        let (mut server_read, mut server_writer) = cotp_server.split().await?;
104
105        let mut over_buffer = [0u8; 1024];
106        let mut data_buffer = Vec::new();
107        for _ in 1..10000 {
108            rand::rng().fill_bytes(&mut over_buffer[..]);
109            data_buffer.extend_from_slice(&over_buffer);
110        }
111
112        match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
113            Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
114            Err(_) => (),
115        }
116        loop {
117            match timeout(Duration::from_millis(100), client_writer.send(&mut VecDeque::new())).await {
118                Ok(_) => break,
119                Err(_) => (),
120            };
121            match timeout(Duration::from_millis(100), server_read.recv()).await {
122                Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
123                Err(_) => (),
124            }
125        }
126        match server_read.recv().await? {
127            api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
128            api::CotpRecvResult::Data(items) => assert_eq!(items, data_buffer.to_vec()),
129        }
130
131        match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::from(vec![data_buffer.to_vec()]))).await {
132            Ok(_) => assert!(false, "Expected the data to be too large for the buffer."),
133            Err(_) => (),
134        }
135        loop {
136            match timeout(Duration::from_millis(100), server_writer.send(&mut VecDeque::new())).await {
137                Ok(_) => break,
138                Err(_) => (),
139            };
140            match timeout(Duration::from_millis(100), client_read.recv()).await {
141                Ok(_) => assert!(false, "Expected that all the payload was not yet send."),
142                Err(_) => (),
143            }
144        }
145        match client_read.recv().await? {
146            api::CotpRecvResult::Closed => assert!(false, "Connection was unexpectedly closed."),
147            api::CotpRecvResult::Data(items) => assert_eq!(items, data_buffer.to_vec()),
148        }
149
150        Ok(())
151    }
152
153    async fn create_cotp_connection_pair(calling_tsap_id: Option<Vec<u8>>, called_tsap_id: Option<Vec<u8>>) -> Result<(TcpCotpConnection<impl TpktReader, impl TpktWriter>, impl CotpConnection), anyhow::Error> {
154        let test_address = format!("127.0.0.1:{}", rand::random_range::<u16, Range<u16>>(20000..30000)).parse()?;
155
156        let tpkt_listener = TcpTpktServer::listen(test_address).await?;
157        let (tpkt_client, tpkt_server) = join!(TcpTpktConnection::connect(test_address), tpkt_listener.accept());
158
159        let connect_information = CotpProtocolInformation::initiator(calling_tsap_id, called_tsap_id);
160
161        let initiator_connect_information = connect_information.clone();
162        let (cotp_initiator, cotp_acceptor) = join!(async move { TcpCotpConnection::<TcpTpktReader, TcpTpktWriter>::initiate(tpkt_client?, initiator_connect_information).await }, async {
163            let (acceptor, remote) = TcpCotpAcceptor::<TcpTpktReader, TcpTpktWriter>::new(tpkt_server?).await?;
164            assert_eq!(remote, connect_information);
165            acceptor.accept(connect_information).await
166        });
167
168        let cotp_client = cotp_initiator?;
169        let cotp_server = cotp_acceptor?;
170
171        Ok((cotp_client, cotp_server))
172    }
173}