1use std::collections::VecDeque;
2
3use rusty_cotp::{CotpConnection, CotpReader, CotpRecvResult, CotpWriter};
4
5use crate::{
6 CospConnection, CospConnectionInformation, CospError, CospInitiator, CospListener, CospReader, CospRecvResult, CospResponder, CospWriter,
7 message::{CospMessage, parameters::TsduMaximumSize},
8 packet::{
9 parameters::{EnclosureField, SessionPduParameter},
10 pdu::SessionPduList,
11 },
12 service::{
13 accept::{receive_accept_with_all_user_data, send_accept},
14 connect::{SendConnectionRequestResult, send_connect_reqeust},
15 message::{MAX_PAYLOAD_SIZE, MIN_PAYLOAD_SIZE, receive_message},
16 overflow::{receive_connect_data_overflow, receive_overflow_accept, send_connect_data_overflow, send_overflow_accept},
17 },
18};
19
20pub(crate) mod accept;
21pub(crate) mod connect;
22pub(crate) mod message;
23pub(crate) mod overflow;
24
25pub struct TcpCospInitiator<R: CotpReader, W: CotpWriter> {
26 cotp_reader: R,
27 cotp_writer: W,
28 options: CospConnectionInformation,
29}
30
31impl<R: CotpReader, W: CotpWriter> TcpCospInitiator<R, W> {
32 pub async fn new(cotp_connection: impl CotpConnection, options: CospConnectionInformation) -> Result<TcpCospInitiator<impl CotpReader, impl CotpWriter>, CospError> {
33 let (cotp_reader, cotp_writer) = cotp_connection.split().await?;
34 Ok(TcpCospInitiator { cotp_reader, cotp_writer, options })
35 }
36}
37
38impl<R: CotpReader, W: CotpWriter> CospInitiator for TcpCospInitiator<R, W> {
39 async fn initiate(self, user_data: Option<Vec<u8>>) -> Result<(impl CospConnection, Option<Vec<u8>>), CospError> {
41 let (mut cotp_reader, mut cotp_writer) = (self.cotp_reader, self.cotp_writer);
42
43 let send_connect_result = send_connect_reqeust(&mut cotp_writer, self.options, user_data.as_deref()).await?;
44
45 let accept_message = match (send_connect_result, user_data) {
46 (SendConnectionRequestResult::Complete, _) => receive_accept_with_all_user_data(&mut cotp_reader).await?,
47 (SendConnectionRequestResult::Overflow(sent_data), Some(user_data)) => {
48 let overflow_accept = receive_overflow_accept(&mut cotp_reader).await?;
49 send_connect_data_overflow(&mut cotp_writer, *overflow_accept.maximum_size_to_responder(), &user_data[sent_data..]).await?;
50 receive_accept_with_all_user_data(&mut cotp_reader).await?
51 }
52 (SendConnectionRequestResult::Overflow(_), None) => return Err(CospError::InternalError("User data was sent even though user data was not provided.".into())),
53 };
54
55 Ok((
56 TcpCospConnection::new(cotp_reader, cotp_writer, *accept_message.maximum_size_to_responder()),
57 accept_message.user_data().map(|data| data.clone()),
58 ))
59 }
60}
61
62pub struct TcpCospListener<R: CotpReader, W: CotpWriter> {
63 cotp_reader: R,
64 cotp_writer: W,
65 user_data: Option<Vec<u8>>,
66 cosp_connection_information: CospConnectionInformation,
67}
68
69impl<R: CotpReader, W: CotpWriter> TcpCospListener<R, W> {
70 pub async fn new(cotp_connection: impl CotpConnection) -> Result<(TcpCospListener<impl CotpReader, impl CotpWriter>, CospConnectionInformation), CospError> {
71 let (mut cotp_reader, mut cotp_writer) = cotp_connection.split().await?;
72
73 let connect_request = match receive_message(&mut cotp_reader).await? {
74 CospMessage::CN(connect_message) => connect_message,
75 message => return Err(CospError::ProtocolError(format!("Expecting a connect message, but got {}", <CospMessage as Into<&'static str>>::into(message)))),
76 };
77
78 let maximum_size_to_initiator = connect_request.maximum_size_to_initiator();
79 let has_more_data = match &connect_request.data_overflow() {
80 Some(overflow) => overflow.more_data(),
81 None => false,
82 };
83
84 let mut user_data = VecDeque::new();
85 let has_user_data = connect_request.user_data().is_some() || connect_request.data_overflow().is_some();
86 if let Some(request_user_data) = connect_request.user_data() {
87 user_data.extend(request_user_data);
88 }
89
90 if has_more_data {
91 send_overflow_accept(&mut cotp_writer, &maximum_size_to_initiator).await?;
92 user_data.extend(receive_connect_data_overflow(&mut cotp_reader).await?);
93 }
94
95 let user_data = match has_user_data {
96 true => Some(user_data.drain(..).collect()),
97 false => None,
98 };
99 let cosp_connection_information = CospConnectionInformation {
100 tsdu_maximum_size: if let TsduMaximumSize::Size(x) = maximum_size_to_initiator { Some(*x) } else { None },
101 called_session_selector: connect_request.called_session_selector().map(|x| x.clone()),
102 calling_session_selector: connect_request.calling_session_selector().map(|x| x.clone()),
103 };
104 Ok((
105 TcpCospListener {
106 cotp_reader,
107 cotp_writer,
108 user_data,
109 cosp_connection_information: cosp_connection_information.clone(),
110 },
111 cosp_connection_information,
112 ))
113 }
114}
115
116impl<R: CotpReader, W: CotpWriter> CospListener for TcpCospListener<R, W> {
117 async fn responder(self) -> Result<(impl CospResponder, CospConnectionInformation, Option<Vec<u8>>), CospError> {
118 let cotp_reader = self.cotp_reader;
119 let cotp_writer = self.cotp_writer;
120
121 let maximum_size_to_initiator = match self.cosp_connection_information.tsdu_maximum_size {
122 Some(x) => TsduMaximumSize::Size(x),
123 None => TsduMaximumSize::Unlimited,
124 };
125
126 Ok((TcpCospResponder::<R, W>::new(cotp_reader, cotp_writer, maximum_size_to_initiator), self.cosp_connection_information, self.user_data))
127 }
128}
129
130pub struct TcpCospResponder<R: CotpReader, W: CotpWriter> {
131 cotp_reader: R,
132 cotp_writer: W,
133 maximum_size_to_initiator: TsduMaximumSize,
134}
135
136impl<R: CotpReader, W: CotpWriter> TcpCospResponder<R, W> {
137 fn new(cotp_reader: impl CotpReader, cotp_writer: impl CotpWriter, maximum_size_to_initiator: TsduMaximumSize) -> TcpCospResponder<impl CotpReader, impl CotpWriter> {
138 TcpCospResponder {
139 cotp_reader,
140 cotp_writer,
141 maximum_size_to_initiator,
142 }
143 }
144}
145
146impl<R: CotpReader, W: CotpWriter> CospResponder for TcpCospResponder<R, W> {
147 async fn accept(self, accept_data: Option<Vec<u8>>) -> Result<impl CospConnection, CospError> {
148 let cotp_reader = self.cotp_reader;
149 let mut cotp_writer = self.cotp_writer;
150
151 send_accept(&mut cotp_writer, &self.maximum_size_to_initiator, accept_data).await?;
152 Ok(TcpCospConnection::new(cotp_reader, cotp_writer, self.maximum_size_to_initiator))
153 }
154}
155
156pub struct TcpCospConnection<R: CotpReader, W: CotpWriter> {
157 cotp_reader: R,
158 cotp_writer: W,
159 remote_max_size: TsduMaximumSize,
160}
161
162impl<R: CotpReader, W: CotpWriter> TcpCospConnection<R, W> {
163 fn new(cotp_reader: R, cotp_writer: W, remote_max_size: TsduMaximumSize) -> TcpCospConnection<impl CotpReader, impl CotpWriter> {
164 TcpCospConnection { cotp_reader, cotp_writer, remote_max_size }
165 }
166}
167
168impl<R: CotpReader, W: CotpWriter> CospConnection for TcpCospConnection<R, W> {
169 async fn split(self) -> Result<(impl CospReader, impl CospWriter), CospError> {
170 Ok((
171 TcpCospReader {
172 cotp_reader: self.cotp_reader,
173 buffer: VecDeque::new(),
174 },
175 TcpCospWriter {
176 cotp_writer: self.cotp_writer,
177 remote_max_size: self.remote_max_size,
178 },
179 ))
180 }
181}
182
183pub struct TcpCospReader<R: CotpReader> {
184 cotp_reader: R,
185 buffer: VecDeque<u8>,
186}
187
188impl<R: CotpReader> CospReader for TcpCospReader<R> {
189 async fn recv(&mut self) -> Result<CospRecvResult, CospError> {
190 loop {
191 let receive_result = self.cotp_reader.recv().await?;
192 let data = match receive_result {
193 CotpRecvResult::Closed => return Ok(CospRecvResult::Closed),
194 CotpRecvResult::Data(data) => data,
195 };
196
197 let received_message = CospMessage::from_spdu_list(SessionPduList::deserialise(&data)?)?;
198 let data_transfer_message = match received_message {
199 CospMessage::DT(message) => message,
200 _ => todo!(),
201 };
202
203 let enclosure = data_transfer_message.enclosure();
204 self.buffer.extend(data_transfer_message.take_user_information());
205 match enclosure {
206 Some(x) if x.end() => return Ok(CospRecvResult::Data(self.buffer.drain(..).collect())),
207 None => return Ok(CospRecvResult::Data(self.buffer.drain(..).collect())),
208 Some(_) => (),
209 }
210 }
211 }
212}
213
214pub struct TcpCospWriter<W: CotpWriter> {
215 cotp_writer: W,
216 remote_max_size: TsduMaximumSize,
217}
218
219impl<W: CotpWriter> CospWriter for TcpCospWriter<W> {
220 async fn send(&mut self, data: &[u8]) -> Result<(), CospError> {
221 const HEADER_LENGTH_WITHOUT_ENCLOSURE: usize = 4; match self.remote_max_size {
224 TsduMaximumSize::Size(x) if data.len() < MAX_PAYLOAD_SIZE && data.len() + HEADER_LENGTH_WITHOUT_ENCLOSURE < x as usize => {
225 let payload = SessionPduList::new(vec![SessionPduParameter::GiveTokens(), SessionPduParameter::DataTransfer(vec![])], data.to_vec()).serialise()?;
226 self.cotp_writer.send(&payload).await?;
227 }
228 TsduMaximumSize::Unlimited => {
229 let payload = SessionPduList::new(vec![SessionPduParameter::GiveTokens(), SessionPduParameter::DataTransfer(vec![])], data.to_vec()).serialise()?;
230 self.cotp_writer.send(&payload).await?;
231 }
232 TsduMaximumSize::Size(x) => {
233 let mut cursor: usize = 0;
234 let payload_length = usize::max(MIN_PAYLOAD_SIZE, usize::min(MAX_PAYLOAD_SIZE, x as usize));
235
236 while cursor < data.len() {
237 let start = cursor;
238 cursor = match cursor + payload_length as usize {
239 cursor if cursor > data.len() => data.len(),
240 cursor => cursor,
241 };
242 let enclosure = EnclosureField(if start == 0 { 1 } else { 0 } + if cursor == data.len() { 2 } else { 0 });
243 let payload = SessionPduList::new(
244 vec![SessionPduParameter::GiveTokens(), SessionPduParameter::DataTransfer(vec![SessionPduParameter::EnclosureParameter(enclosure)])],
245 data[start..cursor].to_vec(),
246 )
247 .serialise()?;
248 self.cotp_writer.send(&payload).await?;
249 }
250 }
251 }
252 Ok(())
253 }
254
255 async fn continue_send(&mut self) -> Result<(), CospError> {
256 Ok(())
257 }
258}