Skip to main content

rusty_cotp/
service.rs

1use std::collections::VecDeque;
2
3use bytes::BytesMut;
4use rusty_tpkt::{TpktConnection, TpktReader, TpktWriter};
5
6use crate::{
7    api::{CotpConnection, CotpError, CotpProtocolInformation, CotpReader, CotpRecvResult, CotpResponder, CotpWriter},
8    packet::{
9        connection_confirm::ConnectionConfirm,
10        connection_request::ConnectionRequest,
11        data_transfer::DataTransfer,
12        parameters::{ConnectionClass, CotpParameter, TpduSize},
13        payload::TransportProtocolDataUnit,
14    },
15    parser::packet::TransportProtocolDataUnitParser,
16    serialiser::packet::serialise,
17};
18
19pub struct TcpCotpConnection<R: TpktReader, W: TpktWriter> {
20    reader: R,
21    writer: W,
22
23    max_payload_size: usize,
24    parser: TransportProtocolDataUnitParser,
25}
26
27impl<R: TpktReader, W: TpktWriter> TcpCotpConnection<R, W> {
28    pub async fn initiate(connection: impl TpktConnection, options: CotpProtocolInformation) -> Result<TcpCotpConnection<impl TpktReader, impl TpktWriter>, CotpError> {
29        let source_reference: u16 = options.initiator_reference();
30        let parser = TransportProtocolDataUnitParser::new();
31        let (mut reader, mut writer) = connection.split().await?;
32
33        send_connection_request(&mut writer, source_reference, options).await?;
34        let connection_confirm = receive_connection_confirm(&mut reader, &parser).await?;
35        let (_, max_payload_size) = calculate_remote_size_payload(connection_confirm.parameters()).await?;
36
37        Ok(TcpCotpConnection::new(reader, writer, max_payload_size).await)
38    }
39
40    async fn new(reader: R, writer: W, max_payload_size: usize) -> TcpCotpConnection<R, W> {
41        TcpCotpConnection { reader, writer, max_payload_size, parser: TransportProtocolDataUnitParser::new() }
42    }
43}
44
45impl<R: TpktReader, W: TpktWriter> CotpConnection for TcpCotpConnection<R, W> {
46    fn get_protocol_infomation_list(&self) -> &Vec<Box<dyn rusty_tpkt::ProtocolInformation>> {
47        todo!()
48    }
49
50    async fn split(self) -> Result<(impl CotpReader, impl CotpWriter), CotpError> {
51        let reader = self.reader;
52        let writer = self.writer;
53        Ok((TcpCotpReader::new(reader, self.parser), TcpCotpWriter::new(writer, self.max_payload_size)))
54    }
55}
56
57pub struct TcpCotpAcceptor<R: TpktReader, W: TpktWriter> {
58    reader: R,
59    writer: W,
60    initiator_reference: u16,
61    max_payload_size: usize,
62    max_payload_indicator: TpduSize,
63    called_tsap_id: Option<Vec<u8>>,
64    calling_tsap_id: Option<Vec<u8>>,
65}
66
67impl<R: TpktReader, W: TpktWriter> TcpCotpAcceptor<R, W> {
68    pub async fn new(tpkt_connection: impl TpktConnection) -> Result<(TcpCotpAcceptor<impl TpktReader, impl TpktWriter>, CotpProtocolInformation), CotpError> {
69        let parser = TransportProtocolDataUnitParser::new();
70        let (mut reader, writer) = tpkt_connection.split().await?;
71
72        let connection_request = receive_connection_request(&mut reader, &parser).await?;
73        let (max_payload_indicator, max_payload_size) = calculate_remote_size_payload(connection_request.parameters()).await?;
74        verify_class_compatibility(&connection_request).await?;
75
76        let mut calling_tsap_id = None;
77        let mut called_tsap_id = None;
78        for parameter in connection_request.parameters() {
79            match parameter {
80                CotpParameter::CallingTsap(tsap_id) => calling_tsap_id = Some(tsap_id.clone()),
81                CotpParameter::CalledTsap(tsap_id) => called_tsap_id = Some(tsap_id.clone()),
82                _ => (),
83            }
84        }
85
86        Ok((
87            TcpCotpAcceptor { reader, writer, max_payload_size, max_payload_indicator, called_tsap_id: called_tsap_id.clone(), calling_tsap_id: calling_tsap_id.clone(), initiator_reference: connection_request.source_reference() },
88            CotpProtocolInformation::new(connection_request.source_reference(), 0, calling_tsap_id, called_tsap_id),
89        ))
90    }
91}
92
93impl<R: TpktReader, W: TpktWriter> CotpResponder for TcpCotpAcceptor<R, W> {
94    async fn accept(mut self, options: CotpProtocolInformation) -> Result<impl CotpConnection, CotpError> {
95        send_connection_confirm(&mut self.writer, options.responder_reference(), self.initiator_reference, self.max_payload_indicator, self.calling_tsap_id, self.called_tsap_id).await?;
96        Ok(TcpCotpConnection::new(self.reader, self.writer, self.max_payload_size).await)
97    }
98}
99
100pub struct TcpCotpReader<R: TpktReader> {
101    // Not caring about the size of the payload we receive.
102    reader: R,
103    parser: TransportProtocolDataUnitParser,
104
105    data_buffer: BytesMut,
106}
107
108impl<R: TpktReader> TcpCotpReader<R> {
109    pub fn new(reader: R, parser: TransportProtocolDataUnitParser) -> Self {
110        Self { reader, parser, data_buffer: BytesMut::new() }
111    }
112}
113
114impl<R: TpktReader> CotpReader for TcpCotpReader<R> {
115    async fn recv(&mut self) -> Result<CotpRecvResult, CotpError> {
116        loop {
117            // I don't really care to check max size. It is 2025.
118            let raw_data = match self.reader.recv().await? {
119                None => return Ok(CotpRecvResult::Closed),
120                Some(raw_data) => raw_data,
121            };
122            let data_transfer = match self.parser.parse(raw_data.as_slice())? {
123                // Choosing the standards based option of reporting the TPDU error locally but not sending an error.
124                TransportProtocolDataUnit::ER(tpdu_error) => return Err(CotpError::ProtocolError(format!("Received an error from the remote host: {:?}", tpdu_error.reason()).into())),
125                TransportProtocolDataUnit::CR(_) => return Err(CotpError::ProtocolError("Received a Connection Request when expecting data.".into())),
126                TransportProtocolDataUnit::CC(_) => return Err(CotpError::ProtocolError("Received a Connection Config when expecting data.".into())),
127                TransportProtocolDataUnit::DR(_) => return Ok(CotpRecvResult::Closed),
128                TransportProtocolDataUnit::DT(data_transfer) => data_transfer,
129            };
130            // I do not really care about the source and destination reference here. It is over a TCP stream. I'd rather keep it relaxed and avoid interop issues.
131
132            self.data_buffer.extend_from_slice(data_transfer.user_data());
133            if data_transfer.end_of_transmission() {
134                let data = self.data_buffer.to_vec();
135                self.data_buffer.clear();
136                return Ok(CotpRecvResult::Data(data));
137            }
138        }
139    }
140}
141
142pub struct TcpCotpWriter<W: TpktWriter> {
143    writer: W,
144    max_payload_size: usize,
145    chunks: VecDeque<Vec<u8>>,
146}
147
148impl<W: TpktWriter> TcpCotpWriter<W> {
149    pub fn new(writer: W, max_payload_size: usize) -> Self {
150        Self { writer, max_payload_size, chunks: VecDeque::new() }
151    }
152}
153
154impl<W: TpktWriter> CotpWriter for TcpCotpWriter<W> {
155    async fn send(&mut self, input: &mut VecDeque<Vec<u8>>) -> Result<(), CotpError> {
156        const HEADER_LENGTH: usize = 3;
157
158        while let Some(data_item) = input.pop_front() {
159            let chunks = data_item.as_slice().chunks(self.max_payload_size - HEADER_LENGTH);
160            let chunk_count = chunks.len();
161            for (chunk_index, chunk_data) in chunks.enumerate() {
162                let end_of_transmission = chunk_index + 1 >= chunk_count;
163                let tpdu = DataTransfer::new(end_of_transmission, chunk_data);
164                let tpdu_data = serialise(&TransportProtocolDataUnit::DT(tpdu))?;
165                self.chunks.push_back(tpdu_data);
166            }
167        }
168
169        while !self.chunks.is_empty() {
170            self.writer.send(&mut self.chunks).await?;
171        }
172
173        // Perform one more to ensure lower levels are also flushed even if this layer is complete.
174        self.writer.send(&mut self.chunks).await?;
175        Ok(())
176    }
177}
178
179async fn verify_class_compatibility(connection_request: &ConnectionRequest) -> Result<(), CotpError> {
180    let empty_set = Vec::new();
181    let class_parameters = connection_request
182        .parameters()
183        .iter()
184        .filter_map(|p| match p {
185            CotpParameter::AlternativeClassParameter(x) => Some(x),
186            _ => None,
187        })
188        .last()
189        .unwrap_or(&empty_set);
190
191    // Verify we can downgrade to Class 0
192    match connection_request.preferred_class() {
193        ConnectionClass::Class0 => (),
194        ConnectionClass::Class1 => (),
195        ConnectionClass::Class2 if class_parameters.contains(&&ConnectionClass::Class0) => (),
196        ConnectionClass::Class3 if class_parameters.contains(&&ConnectionClass::Class0) => (),
197        ConnectionClass::Class3 if class_parameters.contains(&&ConnectionClass::Class1) => (),
198        ConnectionClass::Class4 if class_parameters.contains(&&ConnectionClass::Class0) => (),
199        ConnectionClass::Class4 if class_parameters.contains(&&ConnectionClass::Class1) => (),
200        _ => {
201            return Err(CotpError::ProtocolError(format!("Cannot downgrade connection request to Class 0 {:?} - {:?}", connection_request.preferred_class(), class_parameters)));
202        }
203    };
204    Ok(())
205}
206
207async fn receive_connection_request(reader: &mut impl TpktReader, parser: &TransportProtocolDataUnitParser) -> Result<ConnectionRequest, CotpError> {
208    let data = match reader.recv().await {
209        Ok(Some(x)) => x,
210        Ok(None) => return Err(CotpError::ProtocolError("The connection was closed before the COTP handshake was complete.".into())),
211        Err(e) => return Err(e.into()),
212    };
213    return Ok(match parser.parse(data.as_slice())? {
214        TransportProtocolDataUnit::CR(x) => x,
215        TransportProtocolDataUnit::CC(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a connextion confirm".into())),
216        TransportProtocolDataUnit::DR(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a disconnect reqeust".into())),
217        TransportProtocolDataUnit::DT(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a data transfer".into())),
218        TransportProtocolDataUnit::ER(_) => return Err(CotpError::ProtocolError("Expected connection request on handshake but got a error response".into())),
219    });
220}
221
222async fn calculate_remote_size_payload(parameters: &[CotpParameter]) -> Result<(TpduSize, usize), CotpError> {
223    let parameter: &TpduSize = parameters
224        .iter()
225        .filter_map(|p| match p {
226            CotpParameter::TpduLengthParameter(x) => Some(x),
227            _ => None,
228        })
229        .last()
230        .unwrap_or(&TpduSize::Size128);
231
232    Ok(match parameter {
233        TpduSize::Size8192 => return Err(CotpError::ProtocolError("The remote side selected an 8192 bytes COTP payload but Class 0 support a maximum for 2048 bytes.".into())),
234        TpduSize::Size4096 => return Err(CotpError::ProtocolError("The remote side selected an 4096 bytes COTP payload but Class 0 support a maximum for 2048 bytes.".into())),
235        TpduSize::Unknown(x) => return Err(CotpError::ProtocolError(format!("The requested TPDU size is unknown {:?}.", x).into())),
236        TpduSize::Size128 => (TpduSize::Size128, 128),
237        TpduSize::Size256 => (TpduSize::Size256, 256),
238        TpduSize::Size512 => (TpduSize::Size512, 512),
239        TpduSize::Size1024 => (TpduSize::Size1024, 1024),
240        TpduSize::Size2048 => (TpduSize::Size2048, 2048),
241    })
242}
243
244async fn send_connection_confirm<W: TpktWriter>(writer: &mut W, source_reference: u16, destination_reference: u16, size: TpduSize, calling_tsap_id: Option<Vec<u8>>, called_tsap_id: Option<Vec<u8>>) -> Result<(), CotpError> {
245    let mut parameters = vec![CotpParameter::TpduLengthParameter(size)];
246    if let Some(tsap_id) = calling_tsap_id {
247        parameters.push(CotpParameter::CallingTsap(tsap_id));
248    }
249    if let Some(tsap_id) = called_tsap_id {
250        parameters.push(CotpParameter::CalledTsap(tsap_id));
251    }
252
253    let payload = serialise(&TransportProtocolDataUnit::CC(ConnectionConfirm::new(0, source_reference, destination_reference, ConnectionClass::Class0, vec![], parameters, &[])))?;
254    Ok(writer.send(&mut VecDeque::from_iter(vec![payload].into_iter())).await?)
255}
256
257async fn send_connection_request(writer: &mut impl TpktWriter, source_reference: u16, options: CotpProtocolInformation) -> Result<(), CotpError> {
258    let mut parameters = vec![CotpParameter::TpduLengthParameter(TpduSize::Size2048)];
259    if let Some(calling_tsap) = options.calling_tsap_id() {
260        parameters.push(CotpParameter::CallingTsap(calling_tsap.clone()));
261    }
262    if let Some(called_tsap) = options.called_tsap_id() {
263        parameters.push(CotpParameter::CalledTsap(called_tsap.clone()));
264    }
265
266    let payload = serialise(&TransportProtocolDataUnit::CR(ConnectionRequest::new(source_reference, 0, ConnectionClass::Class0, vec![], parameters, &[])))?;
267    Ok(writer.send(&mut VecDeque::from_iter(vec![payload].into_iter())).await?)
268}
269
270async fn receive_connection_confirm(reader: &mut impl TpktReader, parser: &TransportProtocolDataUnitParser) -> Result<ConnectionConfirm, CotpError> {
271    let data = match reader.recv().await {
272        Ok(Some(x)) => x,
273        Ok(None) => return Err(CotpError::ProtocolError("The connection was closed before the COTP handshake was complete.".into())),
274        Err(e) => return Err(e.into()),
275    };
276    return Ok(match parser.parse(data.as_slice())? {
277        TransportProtocolDataUnit::CC(x) if x.preferred_class() != &ConnectionClass::Class0 => return Err(CotpError::ProtocolError("Remote failed to select COTP Class 0.".into())),
278        TransportProtocolDataUnit::CC(x) => x,
279        TransportProtocolDataUnit::CR(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a connection request".into())),
280        TransportProtocolDataUnit::DR(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a disconnect reqeust".into())),
281        TransportProtocolDataUnit::DT(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a data transfer".into())),
282        TransportProtocolDataUnit::ER(_) => return Err(CotpError::ProtocolError("Expected connection confirmed on handshake but got a error response".into())),
283    });
284}