Skip to main content

rusty_cosp/service/
mod.rs

1use std::collections::VecDeque;
2
3use rusty_cotp::{CotpConnection, CotpReader, CotpRecvResult, CotpWriter};
4
5use crate::{
6    CospConnection, CospConnectionInformation, CospError, CospInitiator, CospListener, CospReader, CospRecvResult, CospResponder, CospWriter,
7    message::{CospMessage, parameters::TsduMaximumSize},
8    packet::{
9        parameters::{EnclosureField, SessionPduParameter},
10        pdu::SessionPduList,
11    },
12    service::{
13        accept::{receive_accept_with_all_user_data, send_accept},
14        connect::{SendConnectionRequestResult, send_connect_reqeust},
15        message::{MAX_PAYLOAD_SIZE, MIN_PAYLOAD_SIZE, receive_message},
16        overflow::{receive_connect_data_overflow, receive_overflow_accept, send_connect_data_overflow, send_overflow_accept},
17    },
18};
19
20pub(crate) mod accept;
21pub(crate) mod connect;
22pub(crate) mod message;
23pub(crate) mod overflow;
24
25pub struct TcpCospInitiator<R: CotpReader, W: CotpWriter> {
26    cotp_reader: R,
27    cotp_writer: W,
28    options: CospConnectionInformation,
29}
30
31impl<R: CotpReader, W: CotpWriter> TcpCospInitiator<R, W> {
32    pub async fn new(cotp_connection: impl CotpConnection, options: CospConnectionInformation) -> Result<TcpCospInitiator<impl CotpReader, impl CotpWriter>, CospError> {
33        let (cotp_reader, cotp_writer) = cotp_connection.split().await?;
34        Ok(TcpCospInitiator { cotp_reader, cotp_writer, options })
35    }
36}
37
38impl<R: CotpReader, W: CotpWriter> CospInitiator for TcpCospInitiator<R, W> {
39    // TODO Also need to handle refuse which will just generically error at the moment.
40    async fn initiate(self, user_data: Option<Vec<u8>>) -> Result<(impl CospConnection, Option<Vec<u8>>), CospError> {
41        let (mut cotp_reader, mut cotp_writer) = (self.cotp_reader, self.cotp_writer);
42
43        let send_connect_result = send_connect_reqeust(&mut cotp_writer, self.options, user_data.as_deref()).await?;
44
45        let accept_message = match (send_connect_result, user_data) {
46            (SendConnectionRequestResult::Complete, _) => receive_accept_with_all_user_data(&mut cotp_reader).await?,
47            (SendConnectionRequestResult::Overflow(sent_data), Some(user_data)) => {
48                let overflow_accept = receive_overflow_accept(&mut cotp_reader).await?;
49                send_connect_data_overflow(&mut cotp_writer, *overflow_accept.maximum_size_to_responder(), &user_data[sent_data..]).await?;
50                receive_accept_with_all_user_data(&mut cotp_reader).await?
51            }
52            (SendConnectionRequestResult::Overflow(_), None) => return Err(CospError::InternalError("User data was sent even though user data was not provided.".into())),
53        };
54
55        Ok((
56            TcpCospConnection::new(cotp_reader, cotp_writer, *accept_message.maximum_size_to_responder()),
57            accept_message.user_data().map(|data| data.clone()),
58        ))
59    }
60}
61
62pub struct TcpCospListener<R: CotpReader, W: CotpWriter> {
63    cotp_reader: R,
64    cotp_writer: W,
65    user_data: Option<Vec<u8>>,
66    cosp_connection_information: CospConnectionInformation,
67}
68
69impl<R: CotpReader, W: CotpWriter> TcpCospListener<R, W> {
70    pub async fn new(cotp_connection: impl CotpConnection) -> Result<(TcpCospListener<impl CotpReader, impl CotpWriter>, CospConnectionInformation), CospError> {
71        let (mut cotp_reader, mut cotp_writer) = cotp_connection.split().await?;
72
73        let connect_request = match receive_message(&mut cotp_reader).await? {
74            CospMessage::CN(connect_message) => connect_message,
75            message => return Err(CospError::ProtocolError(format!("Expecting a connect message, but got {}", <CospMessage as Into<&'static str>>::into(message)))),
76        };
77
78        let maximum_size_to_initiator = connect_request.maximum_size_to_initiator();
79        let has_more_data = match &connect_request.data_overflow() {
80            Some(overflow) => overflow.more_data(),
81            None => false,
82        };
83
84        let mut user_data = VecDeque::new();
85        let has_user_data = connect_request.user_data().is_some() || connect_request.data_overflow().is_some();
86        if let Some(request_user_data) = connect_request.user_data() {
87            user_data.extend(request_user_data);
88        }
89
90        if has_more_data {
91            send_overflow_accept(&mut cotp_writer, &maximum_size_to_initiator).await?;
92            user_data.extend(receive_connect_data_overflow(&mut cotp_reader).await?);
93        }
94
95        let user_data = match has_user_data {
96            true => Some(user_data.drain(..).collect()),
97            false => None,
98        };
99        let cosp_connection_information = CospConnectionInformation {
100            tsdu_maximum_size: if let TsduMaximumSize::Size(x) = maximum_size_to_initiator { Some(*x) } else { None },
101            called_session_selector: connect_request.called_session_selector().map(|x| x.clone()),
102            calling_session_selector: connect_request.calling_session_selector().map(|x| x.clone()),
103        };
104        Ok((
105            TcpCospListener {
106                cotp_reader,
107                cotp_writer,
108                user_data,
109                cosp_connection_information: cosp_connection_information.clone(),
110            },
111            cosp_connection_information,
112        ))
113    }
114}
115
116impl<R: CotpReader, W: CotpWriter> CospListener for TcpCospListener<R, W> {
117    async fn responder(self) -> Result<(impl CospResponder, CospConnectionInformation, Option<Vec<u8>>), CospError> {
118        let cotp_reader = self.cotp_reader;
119        let cotp_writer = self.cotp_writer;
120
121        let maximum_size_to_initiator = match self.cosp_connection_information.tsdu_maximum_size {
122            Some(x) => TsduMaximumSize::Size(x),
123            None => TsduMaximumSize::Unlimited,
124        };
125
126        Ok((TcpCospResponder::<R, W>::new(cotp_reader, cotp_writer, maximum_size_to_initiator), self.cosp_connection_information, self.user_data))
127    }
128}
129
130pub struct TcpCospResponder<R: CotpReader, W: CotpWriter> {
131    cotp_reader: R,
132    cotp_writer: W,
133    maximum_size_to_initiator: TsduMaximumSize,
134}
135
136impl<R: CotpReader, W: CotpWriter> TcpCospResponder<R, W> {
137    fn new(cotp_reader: impl CotpReader, cotp_writer: impl CotpWriter, maximum_size_to_initiator: TsduMaximumSize) -> TcpCospResponder<impl CotpReader, impl CotpWriter> {
138        TcpCospResponder {
139            cotp_reader,
140            cotp_writer,
141            maximum_size_to_initiator,
142        }
143    }
144}
145
146impl<R: CotpReader, W: CotpWriter> CospResponder for TcpCospResponder<R, W> {
147    async fn accept(self, accept_data: Option<Vec<u8>>) -> Result<impl CospConnection, CospError> {
148        let cotp_reader = self.cotp_reader;
149        let mut cotp_writer = self.cotp_writer;
150
151        send_accept(&mut cotp_writer, &self.maximum_size_to_initiator, accept_data).await?;
152        Ok(TcpCospConnection::new(cotp_reader, cotp_writer, self.maximum_size_to_initiator))
153    }
154}
155
156pub struct TcpCospConnection<R: CotpReader, W: CotpWriter> {
157    cotp_reader: R,
158    cotp_writer: W,
159    remote_max_size: TsduMaximumSize,
160}
161
162impl<R: CotpReader, W: CotpWriter> TcpCospConnection<R, W> {
163    fn new(cotp_reader: R, cotp_writer: W, remote_max_size: TsduMaximumSize) -> TcpCospConnection<impl CotpReader, impl CotpWriter> {
164        TcpCospConnection { cotp_reader, cotp_writer, remote_max_size }
165    }
166}
167
168impl<R: CotpReader, W: CotpWriter> CospConnection for TcpCospConnection<R, W> {
169    async fn split(self) -> Result<(impl CospReader, impl CospWriter), CospError> {
170        Ok((
171            TcpCospReader {
172                cotp_reader: self.cotp_reader,
173                buffer: VecDeque::new(),
174            },
175            TcpCospWriter {
176                cotp_writer: self.cotp_writer,
177                remote_max_size: self.remote_max_size,
178            },
179        ))
180    }
181}
182
183pub struct TcpCospReader<R: CotpReader> {
184    cotp_reader: R,
185    buffer: VecDeque<u8>,
186}
187
188impl<R: CotpReader> CospReader for TcpCospReader<R> {
189    async fn recv(&mut self) -> Result<CospRecvResult, CospError> {
190        loop {
191            let receive_result = self.cotp_reader.recv().await?;
192            let data = match receive_result {
193                CotpRecvResult::Closed => return Ok(CospRecvResult::Closed),
194                CotpRecvResult::Data(data) => data,
195            };
196
197            let received_message = CospMessage::from_spdu_list(SessionPduList::deserialise(&data)?)?;
198            let data_transfer_message = match received_message {
199                CospMessage::DT(message) => message,
200                _ => todo!(),
201            };
202
203            let enclosure = data_transfer_message.enclosure();
204            self.buffer.extend(data_transfer_message.take_user_information());
205            match enclosure {
206                Some(x) if x.end() => return Ok(CospRecvResult::Data(self.buffer.drain(..).collect())),
207                None => return Ok(CospRecvResult::Data(self.buffer.drain(..).collect())),
208                Some(_) => (),
209            }
210        }
211    }
212}
213
214pub struct TcpCospWriter<W: CotpWriter> {
215    cotp_writer: W,
216    remote_max_size: TsduMaximumSize,
217}
218
219impl<W: CotpWriter> CospWriter for TcpCospWriter<W> {
220    async fn send(&mut self, data: &[u8]) -> Result<(), CospError> {
221        const HEADER_LENGTH_WITHOUT_ENCLOSURE: usize = 4; // GT + DT
222
223        match self.remote_max_size {
224            TsduMaximumSize::Size(x) if data.len() < MAX_PAYLOAD_SIZE && data.len() + HEADER_LENGTH_WITHOUT_ENCLOSURE < x as usize => {
225                let payload = SessionPduList::new(vec![SessionPduParameter::GiveTokens(), SessionPduParameter::DataTransfer(vec![])], data.to_vec()).serialise()?;
226                self.cotp_writer.send(&payload).await?;
227            }
228            TsduMaximumSize::Unlimited => {
229                let payload = SessionPduList::new(vec![SessionPduParameter::GiveTokens(), SessionPduParameter::DataTransfer(vec![])], data.to_vec()).serialise()?;
230                self.cotp_writer.send(&payload).await?;
231            }
232            TsduMaximumSize::Size(x) => {
233                let mut cursor: usize = 0;
234                let payload_length = usize::max(MIN_PAYLOAD_SIZE, usize::min(MAX_PAYLOAD_SIZE, x as usize));
235
236                while cursor < data.len() {
237                    let start = cursor;
238                    cursor = match cursor + payload_length as usize {
239                        cursor if cursor > data.len() => data.len(),
240                        cursor => cursor,
241                    };
242                    let enclosure = EnclosureField(if start == 0 { 1 } else { 0 } + if cursor == data.len() { 2 } else { 0 });
243                    let payload = SessionPduList::new(
244                        vec![SessionPduParameter::GiveTokens(), SessionPduParameter::DataTransfer(vec![SessionPduParameter::EnclosureParameter(enclosure)])],
245                        data[start..cursor].to_vec(),
246                    )
247                    .serialise()?;
248                    self.cotp_writer.send(&payload).await?;
249                }
250            }
251        }
252        Ok(())
253    }
254
255    async fn continue_send(&mut self) -> Result<(), CospError> {
256        Ok(())
257    }
258}