1use std::collections::VecDeque;
2
3use bytes::BytesMut;
4use rusty_tpkt::{ProtocolInformation, TpktConnection, TpktReader, TpktWriter};
5
6use crate::{
7 api::{CotpConnection, CotpError, CotpProtocolInformation, CotpReader, CotpResponder, CotpWriter},
8 packet::{
9 connection_confirm::ConnectionConfirm,
10 connection_request::ConnectionRequest,
11 data_transfer::DataTransfer,
12 parameters::{ConnectionClass, CotpParameter, TpduSize},
13 payload::TransportProtocolDataUnit,
14 },
15 parser::packet::TransportProtocolDataUnitParser,
16 serialiser::packet::serialise,
17};
18
19pub struct TcpCotpConnection<R: TpktReader, W: TpktWriter> {
20 reader: R,
21 writer: W,
22
23 max_payload_size: usize,
24 parser: TransportProtocolDataUnitParser,
25 protocol_infomation_list: Vec<Box<dyn ProtocolInformation>>,
26}
27
28impl<R: TpktReader, W: TpktWriter> TcpCotpConnection<R, W> {
29 pub async fn initiate(connection: impl TpktConnection, options: CotpProtocolInformation) -> Result<TcpCotpConnection<impl TpktReader, impl TpktWriter>, CotpError> {
30 let mut protocol_infomation_list = connection.get_protocol_infomation_list().clone();
32 let local_calling_tsap = options.calling_tsap_id().cloned();
33
34 let source_reference: u16 = options.initiator_reference();
35 let parser = TransportProtocolDataUnitParser::new();
36 let (mut reader, mut writer) = connection.split().await?;
37
38 send_connection_request(&mut writer, source_reference, options).await?;
39 let connection_confirm = receive_connection_confirm(&mut reader, &parser).await?;
40 let (_, max_payload_size) = calculate_remote_size_payload(connection_confirm.parameters()).await?;
41
42 let remote_called_tsap = connection_confirm.parameters().iter().filter_map(|x| if let CotpParameter::CalledTsap(tsap) = x { Some(tsap.clone()) } else { None }).last();
43 protocol_infomation_list.push(Box::new(CotpProtocolInformation::new(source_reference, connection_confirm.destination_reference(), local_calling_tsap, remote_called_tsap)));
44
45 Ok(TcpCotpConnection::new(reader, writer, max_payload_size, protocol_infomation_list).await)
46 }
47
48 async fn new(reader: R, writer: W, max_payload_size: usize, protocol_infomation_list: Vec<Box<dyn ProtocolInformation>>) -> TcpCotpConnection<R, W> {
49 TcpCotpConnection { reader, writer, max_payload_size, parser: TransportProtocolDataUnitParser::new(), protocol_infomation_list }
50 }
51}
52
53impl<R: TpktReader, W: TpktWriter> CotpConnection for TcpCotpConnection<R, W> {
54 fn get_protocol_infomation_list(&self) -> &Vec<Box<dyn rusty_tpkt::ProtocolInformation>> {
55 &self.protocol_infomation_list
56 }
57
58 async fn split(self) -> Result<(impl CotpReader, impl CotpWriter), CotpError> {
59 let reader = self.reader;
60 let writer = self.writer;
61 Ok((TcpCotpReader::new(reader, self.parser), TcpCotpWriter::new(writer, self.max_payload_size)))
62 }
63}
64
65pub struct TcpCotpAcceptor<R: TpktReader, W: TpktWriter> {
66 reader: R,
67 writer: W,
68 initiator_reference: u16,
69 max_payload_size: usize,
70 max_payload_indicator: TpduSize,
71 called_tsap_id: Option<Vec<u8>>,
72 calling_tsap_id: Option<Vec<u8>>,
73 lower_layer_protocol_options_list: Vec<Box<dyn ProtocolInformation>>,
74}
75
76impl<R: TpktReader, W: TpktWriter> TcpCotpAcceptor<R, W> {
77 pub async fn new(tpkt_connection: impl TpktConnection) -> Result<(TcpCotpAcceptor<impl TpktReader, impl TpktWriter>, CotpProtocolInformation), CotpError> {
78 let parser = TransportProtocolDataUnitParser::new();
79 let lower_layer_protocol_options_list = tpkt_connection.get_protocol_infomation_list().clone();
80 let (mut reader, writer) = tpkt_connection.split().await?;
81
82 let connection_request = receive_connection_request(&mut reader, &parser).await?;
83 let (max_payload_indicator, max_payload_size) = calculate_remote_size_payload(connection_request.parameters()).await?;
84 verify_class_compatibility(&connection_request).await?;
85
86 let mut calling_tsap_id = None;
87 let mut called_tsap_id = None;
88 for parameter in connection_request.parameters() {
89 match parameter {
90 CotpParameter::CallingTsap(tsap_id) => calling_tsap_id = Some(tsap_id.clone()),
91 CotpParameter::CalledTsap(tsap_id) => called_tsap_id = Some(tsap_id.clone()),
92 _ => (),
93 }
94 }
95
96 Ok((
97 TcpCotpAcceptor {
98 reader,
99 writer,
100 max_payload_size,
101 max_payload_indicator,
102 called_tsap_id: called_tsap_id.clone(),
103 calling_tsap_id: calling_tsap_id.clone(),
104 initiator_reference: connection_request.source_reference(),
105 lower_layer_protocol_options_list,
106 },
107 CotpProtocolInformation::new(connection_request.source_reference(), 0, calling_tsap_id, called_tsap_id),
108 ))
109 }
110}
111
112impl<R: TpktReader, W: TpktWriter> CotpResponder for TcpCotpAcceptor<R, W> {
113 async fn accept(mut self, options: CotpProtocolInformation) -> Result<impl CotpConnection, CotpError> {
114 send_connection_confirm(&mut self.writer, options.responder_reference(), self.initiator_reference, self.max_payload_indicator, self.calling_tsap_id, self.called_tsap_id).await?;
115 Ok(TcpCotpConnection::new(self.reader, self.writer, self.max_payload_size, self.lower_layer_protocol_options_list).await)
116 }
117}
118
119pub struct TcpCotpReader<R: TpktReader> {
120 reader: R,
121 parser: TransportProtocolDataUnitParser,
122
123 data_buffer: BytesMut,
124}
125
126impl<R: TpktReader> TcpCotpReader<R> {
127 pub fn new(reader: R, parser: TransportProtocolDataUnitParser) -> Self {
128 Self { reader, parser, data_buffer: BytesMut::new() }
129 }
130}
131
132impl<R: TpktReader> CotpReader for TcpCotpReader<R> {
133 async fn recv(&mut self) -> Result<Option<Vec<u8>>, CotpError> {
134 loop {
135 let raw_data = match self.reader.recv().await? {
138 None => return Ok(None),
139 Some(raw_data) => raw_data,
140 };
141 let data_transfer = match self.parser.parse(raw_data.as_slice())? {
142 TransportProtocolDataUnit::ER(tpdu_error) => return Err(CotpError::ProtocolError(format!("Received an error from the remote host: {:?}", tpdu_error.reason()).into())),
144 TransportProtocolDataUnit::CR(_) => return Err(CotpError::ProtocolError("Received a Connection Request when expecting data.".into())),
145 TransportProtocolDataUnit::CC(_) => return Err(CotpError::ProtocolError("Received a Connection Config when expecting data.".into())),
146 TransportProtocolDataUnit::DR(_) => return Ok(None),
147 TransportProtocolDataUnit::DT(data_transfer) => data_transfer,
148 };
149
150 self.data_buffer.extend_from_slice(data_transfer.user_data());
156 if data_transfer.end_of_transmission() {
157 let data = self.data_buffer.to_vec();
158 self.data_buffer.clear();
159 return Ok(Some(data));
160 }
161 }
162 }
163}
164
165pub struct TcpCotpWriter<W: TpktWriter> {
166 writer: W,
167 max_payload_size: usize,
168 chunks: VecDeque<Vec<u8>>,
169}
170
171impl<W: TpktWriter> TcpCotpWriter<W> {
172 pub fn new(writer: W, max_payload_size: usize) -> Self {
173 Self { writer, max_payload_size, chunks: VecDeque::new() }
174 }
175}
176
177impl<W: TpktWriter> CotpWriter for TcpCotpWriter<W> {
178 async fn send(&mut self, input: &mut VecDeque<Vec<u8>>) -> Result<(), CotpError> {
179 const HEADER_LENGTH: usize = 3;
180
181 while let Some(data_item) = input.pop_front() {
182 let chunks = data_item.as_slice().chunks(self.max_payload_size - HEADER_LENGTH);
183 let chunk_count = chunks.len();
184 for (chunk_index, chunk_data) in chunks.enumerate() {
185 let end_of_transmission = chunk_index + 1 >= chunk_count;
186 let tpdu = DataTransfer::new(end_of_transmission, chunk_data);
187 let tpdu_data = serialise(&TransportProtocolDataUnit::DT(tpdu))?;
188 self.chunks.push_back(tpdu_data);
189 }
190 }
191
192 while !self.chunks.is_empty() {
193 self.writer.send(&mut self.chunks).await?;
194 }
195
196 self.writer.send(&mut self.chunks).await?;
198 Ok(())
199 }
200}
201
202async fn verify_class_compatibility(connection_request: &ConnectionRequest) -> Result<(), CotpError> {
203 let empty_set = Vec::new();
204 let class_parameters = connection_request
205 .parameters()
206 .iter()
207 .filter_map(|p| match p {
208 CotpParameter::AlternativeClassParameter(x) => Some(x),
209 _ => None,
210 })
211 .last()
212 .unwrap_or(&empty_set);
213
214 match connection_request.preferred_class() {
216 ConnectionClass::Class0 => (),
217 ConnectionClass::Class1 => (),
218 ConnectionClass::Class2 if class_parameters.contains(&&ConnectionClass::Class0) => (),
219 ConnectionClass::Class3 if class_parameters.contains(&&ConnectionClass::Class0) => (),
220 ConnectionClass::Class3 if class_parameters.contains(&&ConnectionClass::Class1) => (),
221 ConnectionClass::Class4 if class_parameters.contains(&&ConnectionClass::Class0) => (),
222 ConnectionClass::Class4 if class_parameters.contains(&&ConnectionClass::Class1) => (),
223 _ => {
224 return Err(CotpError::ProtocolError(format!("Cannot downgrade connection request to Class 0 {:?} - {:?}", connection_request.preferred_class(), class_parameters)));
225 }
226 };
227 Ok(())
228}
229
230async fn receive_connection_request(reader: &mut impl TpktReader, parser: &TransportProtocolDataUnitParser) -> Result<ConnectionRequest, CotpError> {
231 let data = match reader.recv().await {
232 Ok(Some(x)) => x,
233 Ok(None) => return Err(CotpError::ProtocolError("The connection was closed before the COTP handshake was complete.".into())),
234 Err(e) => return Err(e.into()),
235 };
236 return Ok(match parser.parse(data.as_slice())? {
237 TransportProtocolDataUnit::CR(x) => x,
238 TransportProtocolDataUnit::CC(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a connextion confirm".into())),
239 TransportProtocolDataUnit::DR(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a disconnect reqeust".into())),
240 TransportProtocolDataUnit::DT(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a data transfer".into())),
241 TransportProtocolDataUnit::ER(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a error response".into())),
242 });
243}
244
245async fn calculate_remote_size_payload(parameters: &[CotpParameter]) -> Result<(TpduSize, usize), CotpError> {
246 let parameter: &TpduSize = parameters
247 .iter()
248 .filter_map(|p| match p {
249 CotpParameter::TpduLengthParameter(x) => Some(x),
250 _ => None,
251 })
252 .last()
253 .unwrap_or(&TpduSize::Size128);
254
255 Ok(match parameter {
256 TpduSize::Size8192 => return Err(CotpError::ProtocolError("The remote side selected an 8192 bytes COTP payload but Class 0 support a maximum for 2048 bytes.".into())),
257 TpduSize::Size4096 => return Err(CotpError::ProtocolError("The remote side selected an 4096 bytes COTP payload but Class 0 support a maximum for 2048 bytes.".into())),
258 TpduSize::Unknown(x) => return Err(CotpError::ProtocolError(format!("The requested TPDU size is unknown {:?}.", x).into())),
259 TpduSize::Size128 => (TpduSize::Size128, 128),
260 TpduSize::Size256 => (TpduSize::Size256, 256),
261 TpduSize::Size512 => (TpduSize::Size512, 512),
262 TpduSize::Size1024 => (TpduSize::Size1024, 1024),
263 TpduSize::Size2048 => (TpduSize::Size2048, 2048),
264 })
265}
266
267async fn send_connection_confirm<W: TpktWriter>(writer: &mut W, source_reference: u16, destination_reference: u16, size: TpduSize, calling_tsap_id: Option<Vec<u8>>, called_tsap_id: Option<Vec<u8>>) -> Result<(), CotpError> {
268 let mut parameters = vec![CotpParameter::TpduLengthParameter(size)];
269 if let Some(tsap_id) = calling_tsap_id {
270 parameters.push(CotpParameter::CallingTsap(tsap_id));
271 }
272 if let Some(tsap_id) = called_tsap_id {
273 parameters.push(CotpParameter::CalledTsap(tsap_id));
274 }
275
276 let payload = serialise(&TransportProtocolDataUnit::CC(ConnectionConfirm::new(0, source_reference, destination_reference, ConnectionClass::Class0, vec![], parameters, &[])))?;
277 Ok(writer.send(&mut VecDeque::from_iter(vec![payload].into_iter())).await?)
278}
279
280async fn send_connection_request(writer: &mut impl TpktWriter, source_reference: u16, options: CotpProtocolInformation) -> Result<(), CotpError> {
281 let mut parameters = vec![CotpParameter::TpduLengthParameter(TpduSize::Size2048)];
282 if let Some(calling_tsap) = options.calling_tsap_id() {
283 parameters.push(CotpParameter::CallingTsap(calling_tsap.clone()));
284 }
285 if let Some(called_tsap) = options.called_tsap_id() {
286 parameters.push(CotpParameter::CalledTsap(called_tsap.clone()));
287 }
288
289 let payload = serialise(&TransportProtocolDataUnit::CR(ConnectionRequest::new(source_reference, 0, ConnectionClass::Class0, vec![], parameters, &[])))?;
290 Ok(writer.send(&mut VecDeque::from_iter(vec![payload].into_iter())).await?)
291}
292
293async fn receive_connection_confirm(reader: &mut impl TpktReader, parser: &TransportProtocolDataUnitParser) -> Result<ConnectionConfirm, CotpError> {
294 let data = match reader.recv().await {
295 Ok(Some(x)) => x,
296 Ok(None) => return Err(CotpError::ProtocolError("The connection was closed before the COTP handshake was complete.".into())),
297 Err(e) => return Err(e.into()),
298 };
299 return Ok(match parser.parse(data.as_slice())? {
300 TransportProtocolDataUnit::CC(x) if x.preferred_class() != &ConnectionClass::Class0 => return Err(CotpError::ProtocolError("Remote failed to select COTP Class 0.".into())),
301 TransportProtocolDataUnit::CC(x) => x,
302 TransportProtocolDataUnit::CR(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a connection request".into())),
303 TransportProtocolDataUnit::DR(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a disconnect reqeust".into())),
304 TransportProtocolDataUnit::DT(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a data transfer".into())),
305 TransportProtocolDataUnit::ER(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a error response".into())),
306 });
307}