Skip to main content

rusty_cotp/
service.rs

1use std::collections::VecDeque;
2
3use bytes::BytesMut;
4use rusty_tpkt::{ProtocolInformation, TpktConnection, TpktReader, TpktWriter};
5
6use crate::{
7    CotpConnectionParameters,
8    api::{CotpConnection, CotpError, CotpProtocolInformation, CotpReader, CotpResponder, CotpWriter},
9    packet::{
10        connection_confirm::ConnectionConfirm,
11        connection_request::ConnectionRequest,
12        data_transfer::DataTransfer,
13        parameters::{ConnectionClass, CotpParameter, TpduSize},
14        payload::TransportProtocolDataUnit,
15    },
16    parser::packet::TransportProtocolDataUnitParser,
17    serialiser::packet::serialise,
18};
19
20/// A COTP connection provides a packet based data exchange mechanism.
21///
22/// Initiator connections may be initiated via this struct. To act as a responder, the acceptor class should be used.
23pub struct RustyCotpConnection<R: TpktReader, W: TpktWriter> {
24    reader: R,
25    writer: W,
26
27    max_payload_size: usize,
28    parser: TransportProtocolDataUnitParser,
29    connection_options: CotpConnectionParameters,
30    protocol_infomation_list: Vec<Box<dyn ProtocolInformation>>,
31}
32
33impl<R: TpktReader, W: TpktWriter> RustyCotpConnection<R, W> {
34    /// Initiates a connection to a responder COTP service.
35    pub async fn initiate(connection: impl TpktConnection, options: CotpProtocolInformation, connection_options: CotpConnectionParameters) -> Result<RustyCotpConnection<impl TpktReader, impl TpktWriter>, CotpError> {
36        // FIXME WARN Log the differences between remote and local parameters.
37        let mut protocol_infomation_list = connection.get_protocol_infomation_list().clone();
38        let local_calling_tsap = options.calling_tsap_id().cloned();
39
40        let source_reference: u16 = options.initiator_reference();
41        let parser = TransportProtocolDataUnitParser::new();
42        let (mut reader, mut writer) = connection.split().await?;
43
44        send_connection_request(&mut writer, source_reference, options).await?;
45        let connection_confirm = receive_connection_confirm(&mut reader, &parser).await?;
46        let (_, max_payload_size) = calculate_remote_size_payload(connection_confirm.parameters()).await?;
47
48        let remote_called_tsap = connection_confirm.parameters().iter().filter_map(|x| if let CotpParameter::CalledTsap(tsap) = x { Some(tsap.clone()) } else { None }).last();
49        protocol_infomation_list.push(Box::new(CotpProtocolInformation::new(source_reference, connection_confirm.destination_reference(), local_calling_tsap, remote_called_tsap)));
50
51        Ok(RustyCotpConnection::new(reader, writer, max_payload_size, protocol_infomation_list, connection_options).await)
52    }
53
54    async fn new(reader: R, writer: W, max_payload_size: usize, protocol_infomation_list: Vec<Box<dyn ProtocolInformation>>, connection_options: CotpConnectionParameters) -> RustyCotpConnection<R, W> {
55        RustyCotpConnection { reader, writer, max_payload_size, parser: TransportProtocolDataUnitParser::new(), protocol_infomation_list, connection_options }
56    }
57}
58
59impl<R: TpktReader, W: TpktWriter> CotpConnection for RustyCotpConnection<R, W> {
60    fn get_protocol_infomation_list(&self) -> &Vec<Box<dyn rusty_tpkt::ProtocolInformation>> {
61        &self.protocol_infomation_list
62    }
63
64    async fn split(self) -> Result<(impl CotpReader, impl CotpWriter), CotpError> {
65        let reader = self.reader;
66        let writer = self.writer;
67        Ok((RustyCotpReader::new(reader, self.parser, self.connection_options), RustyCotpWriter::new(writer, self.max_payload_size)))
68    }
69}
70
71/// Creates a responder that consumes the underlying TPKT service to negotiate a COTP connection.
72pub struct RustyCotpAcceptor<R: TpktReader, W: TpktWriter> {
73    reader: R,
74    writer: W,
75    initiator_reference: u16,
76    max_payload_size: usize,
77    max_payload_indicator: TpduSize,
78    called_tsap_id: Option<Vec<u8>>,
79    calling_tsap_id: Option<Vec<u8>>,
80    connection_options: CotpConnectionParameters,
81    lower_layer_protocol_options_list: Vec<Box<dyn ProtocolInformation>>,
82}
83
84impl<R: TpktReader, W: TpktWriter> RustyCotpAcceptor<R, W> {
85    /// Creates an acceptor.
86    ///
87    /// This is a single use component used to upgrade an underlying TPKT connection to a COTP connection.
88    /// The TPKT connection should be a server, but this is not enforced.
89    pub async fn new(tpkt_connection: impl TpktConnection, connection_options: CotpConnectionParameters) -> Result<(RustyCotpAcceptor<impl TpktReader, impl TpktWriter>, CotpProtocolInformation), CotpError> {
90        let parser = TransportProtocolDataUnitParser::new();
91        let lower_layer_protocol_options_list = tpkt_connection.get_protocol_infomation_list().clone();
92        let (mut reader, writer) = tpkt_connection.split().await?;
93
94        let connection_request = receive_connection_request(&mut reader, &parser).await?;
95        let (max_payload_indicator, max_payload_size) = calculate_remote_size_payload(connection_request.parameters()).await?;
96        verify_class_compatibility(&connection_request).await?;
97
98        let mut calling_tsap_id = None;
99        let mut called_tsap_id = None;
100        for parameter in connection_request.parameters() {
101            match parameter {
102                CotpParameter::CallingTsap(tsap_id) => calling_tsap_id = Some(tsap_id.clone()),
103                CotpParameter::CalledTsap(tsap_id) => called_tsap_id = Some(tsap_id.clone()),
104                _ => (),
105            }
106        }
107
108        Ok((
109            RustyCotpAcceptor {
110                reader,
111                writer,
112                max_payload_size,
113                connection_options,
114                max_payload_indicator,
115                called_tsap_id: called_tsap_id.clone(),
116                calling_tsap_id: calling_tsap_id.clone(),
117                initiator_reference: connection_request.source_reference(),
118                lower_layer_protocol_options_list,
119            },
120            CotpProtocolInformation::new(connection_request.source_reference(), 0, calling_tsap_id, called_tsap_id),
121        ))
122    }
123}
124
125impl<R: TpktReader, W: TpktWriter> CotpResponder for RustyCotpAcceptor<R, W> {
126    async fn accept(mut self, options: CotpProtocolInformation) -> Result<impl CotpConnection, CotpError> {
127        send_connection_confirm(&mut self.writer, options.responder_reference(), self.initiator_reference, self.max_payload_indicator, self.calling_tsap_id, self.called_tsap_id).await?;
128        Ok(RustyCotpConnection::new(self.reader, self.writer, self.max_payload_size, self.lower_layer_protocol_options_list, self.connection_options).await)
129    }
130}
131
132/// Used to receive data to a remote a COTP host.
133pub struct RustyCotpReader<R: TpktReader> {
134    reader: R,
135    parser: TransportProtocolDataUnitParser,
136    connection_options: CotpConnectionParameters,
137
138    data_buffer: BytesMut,
139}
140
141impl<R: TpktReader> RustyCotpReader<R> {
142    fn new(reader: R, parser: TransportProtocolDataUnitParser, connection_options: CotpConnectionParameters) -> Self {
143        Self { reader, parser, data_buffer: BytesMut::new(), connection_options }
144    }
145}
146
147impl<R: TpktReader> CotpReader for RustyCotpReader<R> {
148    async fn recv(&mut self) -> Result<Option<Vec<u8>>, CotpError> {
149        loop {
150            let raw_data = match self.reader.recv().await? {
151                None => return Ok(None),
152                Some(raw_data) => raw_data,
153            };
154            let data_transfer = match self.parser.parse(raw_data.as_slice())? {
155                // Choosing the standards based option of reporting the TPDU error locally but not sending an error.
156                TransportProtocolDataUnit::ER(tpdu_error) => return Err(CotpError::ProtocolError(format!("Received an error from the remote host: {:?}", tpdu_error.reason()).into())),
157                TransportProtocolDataUnit::CR(_) => return Err(CotpError::ProtocolError("Received a Connection Request when expecting data.".into())),
158                TransportProtocolDataUnit::CC(_) => return Err(CotpError::ProtocolError("Received a Connection Config when expecting data.".into())),
159                TransportProtocolDataUnit::DR(_) => return Ok(None),
160                TransportProtocolDataUnit::DT(data_transfer) => data_transfer,
161            };
162
163            // Not performing strict checking of source and destination reference:
164            // - This is running over a TCP stream.
165            // - This package supports Class 0 only, which is a single COTP association per TCP stream. References look like the are used in Class 1-4.
166
167            self.data_buffer.extend_from_slice(data_transfer.user_data());
168            if self.data_buffer.len() > self.connection_options.max_reassembled_payload_size {
169                let reassembled_size = self.data_buffer.len();
170                let max_reassembled_size = self.connection_options.max_reassembled_payload_size;
171                self.data_buffer.clear();
172                return Err(CotpError::ProtocolError(format!("Reassembled payload size {reassembled_size} exceeds maximum payload size {max_reassembled_size}")));
173            }
174            if data_transfer.end_of_transmission() {
175                let data = self.data_buffer.to_vec();
176                self.data_buffer.clear();
177                return Ok(Some(data));
178            }
179        }
180    }
181}
182
183/// Used to send data to a remote a COTP host.
184pub struct RustyCotpWriter<W: TpktWriter> {
185    writer: W,
186    max_payload_size: usize,
187    chunks: VecDeque<Vec<u8>>,
188}
189
190impl<W: TpktWriter> RustyCotpWriter<W> {
191    fn new(writer: W, max_payload_size: usize) -> Self {
192        Self { writer, max_payload_size, chunks: VecDeque::new() }
193    }
194}
195
196impl<W: TpktWriter> CotpWriter for RustyCotpWriter<W> {
197    async fn send(&mut self, input: &mut VecDeque<Vec<u8>>) -> Result<(), CotpError> {
198        const HEADER_LENGTH: usize = 3;
199
200        while let Some(data_item) = input.pop_front() {
201            let chunks = data_item.as_slice().chunks(self.max_payload_size - HEADER_LENGTH);
202            let chunk_count = chunks.len();
203            for (chunk_index, chunk_data) in chunks.enumerate() {
204                let end_of_transmission = chunk_index + 1 >= chunk_count;
205                let tpdu = DataTransfer::new(end_of_transmission, chunk_data);
206                let tpdu_data = serialise(&TransportProtocolDataUnit::DT(tpdu))?;
207                self.chunks.push_back(tpdu_data);
208            }
209        }
210
211        while !self.chunks.is_empty() {
212            self.writer.send(&mut self.chunks).await?;
213        }
214
215        // Perform one more to ensure lower levels are also flushed even if this layer is complete.
216        self.writer.send(&mut self.chunks).await?;
217        Ok(())
218    }
219}
220
221async fn verify_class_compatibility(connection_request: &ConnectionRequest) -> Result<(), CotpError> {
222    let empty_set = Vec::new();
223    let class_parameters = connection_request
224        .parameters()
225        .iter()
226        .filter_map(|p| match p {
227            CotpParameter::AlternativeClassParameter(x) => Some(x),
228            _ => None,
229        })
230        .last()
231        .unwrap_or(&empty_set);
232
233    // Verify we can downgrade to Class 0
234    match connection_request.preferred_class() {
235        ConnectionClass::Class0 => (),
236        ConnectionClass::Class1 => (),
237        ConnectionClass::Class2 if class_parameters.contains(&&ConnectionClass::Class0) => (),
238        ConnectionClass::Class3 if class_parameters.contains(&&ConnectionClass::Class0) => (),
239        ConnectionClass::Class3 if class_parameters.contains(&&ConnectionClass::Class1) => (),
240        ConnectionClass::Class4 if class_parameters.contains(&&ConnectionClass::Class0) => (),
241        ConnectionClass::Class4 if class_parameters.contains(&&ConnectionClass::Class1) => (),
242        _ => {
243            return Err(CotpError::ProtocolError(format!("Cannot downgrade connection request to Class 0 {:?} - {:?}", connection_request.preferred_class(), class_parameters)));
244        }
245    };
246    Ok(())
247}
248
249async fn receive_connection_request(reader: &mut impl TpktReader, parser: &TransportProtocolDataUnitParser) -> Result<ConnectionRequest, CotpError> {
250    let data = match reader.recv().await {
251        Ok(Some(x)) => x,
252        Ok(None) => return Err(CotpError::ProtocolError("The connection was closed before the COTP handshake was complete.".into())),
253        Err(e) => return Err(e.into()),
254    };
255    return Ok(match parser.parse(data.as_slice())? {
256        TransportProtocolDataUnit::CR(x) => x,
257        TransportProtocolDataUnit::CC(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a connextion confirm".into())),
258        TransportProtocolDataUnit::DR(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a disconnect reqeust".into())),
259        TransportProtocolDataUnit::DT(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a data transfer".into())),
260        TransportProtocolDataUnit::ER(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a error response".into())),
261    });
262}
263
264async fn calculate_remote_size_payload(parameters: &[CotpParameter]) -> Result<(TpduSize, usize), CotpError> {
265    let parameter: &TpduSize = parameters
266        .iter()
267        .filter_map(|p| match p {
268            CotpParameter::TpduLengthParameter(x) => Some(x),
269            _ => None,
270        })
271        .last()
272        .unwrap_or(&TpduSize::Size128);
273
274    Ok(match parameter {
275        TpduSize::Size8192 => return Err(CotpError::ProtocolError("The remote side selected an 8192 bytes COTP payload but Class 0 support a maximum for 2048 bytes.".into())),
276        TpduSize::Size4096 => return Err(CotpError::ProtocolError("The remote side selected an 4096 bytes COTP payload but Class 0 support a maximum for 2048 bytes.".into())),
277        TpduSize::Unknown(x) => return Err(CotpError::ProtocolError(format!("The requested TPDU size is unknown {:?}.", x).into())),
278        TpduSize::Size128 => (TpduSize::Size128, 128),
279        TpduSize::Size256 => (TpduSize::Size256, 256),
280        TpduSize::Size512 => (TpduSize::Size512, 512),
281        TpduSize::Size1024 => (TpduSize::Size1024, 1024),
282        TpduSize::Size2048 => (TpduSize::Size2048, 2048),
283    })
284}
285
286async fn send_connection_confirm<W: TpktWriter>(writer: &mut W, source_reference: u16, destination_reference: u16, size: TpduSize, calling_tsap_id: Option<Vec<u8>>, called_tsap_id: Option<Vec<u8>>) -> Result<(), CotpError> {
287    let mut parameters = vec![CotpParameter::TpduLengthParameter(size)];
288    if let Some(tsap_id) = calling_tsap_id {
289        parameters.push(CotpParameter::CallingTsap(tsap_id));
290    }
291    if let Some(tsap_id) = called_tsap_id {
292        parameters.push(CotpParameter::CalledTsap(tsap_id));
293    }
294
295    let payload = serialise(&TransportProtocolDataUnit::CC(ConnectionConfirm::new(0, source_reference, destination_reference, ConnectionClass::Class0, vec![], parameters, &[])))?;
296    Ok(writer.send(&mut VecDeque::from_iter(vec![payload].into_iter())).await?)
297}
298
299async fn send_connection_request(writer: &mut impl TpktWriter, source_reference: u16, options: CotpProtocolInformation) -> Result<(), CotpError> {
300    let mut parameters = vec![CotpParameter::TpduLengthParameter(TpduSize::Size2048)];
301    if let Some(calling_tsap) = options.calling_tsap_id() {
302        parameters.push(CotpParameter::CallingTsap(calling_tsap.clone()));
303    }
304    if let Some(called_tsap) = options.called_tsap_id() {
305        parameters.push(CotpParameter::CalledTsap(called_tsap.clone()));
306    }
307
308    let payload = serialise(&TransportProtocolDataUnit::CR(ConnectionRequest::new(source_reference, 0, ConnectionClass::Class0, vec![], parameters, &[])))?;
309    Ok(writer.send(&mut VecDeque::from_iter(vec![payload].into_iter())).await?)
310}
311
312async fn receive_connection_confirm(reader: &mut impl TpktReader, parser: &TransportProtocolDataUnitParser) -> Result<ConnectionConfirm, CotpError> {
313    let data = match reader.recv().await {
314        Ok(Some(x)) => x,
315        Ok(None) => return Err(CotpError::ProtocolError("The connection was closed before the COTP handshake was complete.".into())),
316        Err(e) => return Err(e.into()),
317    };
318    return Ok(match parser.parse(data.as_slice())? {
319        TransportProtocolDataUnit::CC(x) if x.preferred_class() != &ConnectionClass::Class0 => return Err(CotpError::ProtocolError("Remote failed to select COTP Class 0.".into())),
320        TransportProtocolDataUnit::CC(x) => x,
321        TransportProtocolDataUnit::CR(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a connection request".into())),
322        TransportProtocolDataUnit::DR(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a disconnect reqeust".into())),
323        TransportProtocolDataUnit::DT(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a data transfer".into())),
324        TransportProtocolDataUnit::ER(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a error response".into())),
325    });
326}