satrs_core/hal/std/tcp_server.rs
1//! Generic TCP TMTC servers with different TMTC format flavours.
2use alloc::vec;
3use alloc::vec::Vec;
4use core::time::Duration;
5use socket2::{Domain, Socket, Type};
6use std::io::Read;
7use std::net::TcpListener;
8use std::net::{SocketAddr, TcpStream};
9use std::thread;
10
11use crate::tmtc::{ReceivesTc, TmPacketSource};
12use thiserror::Error;
13
14// Re-export the TMTC in COBS server.
15pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
16pub use crate::hal::std::tcp_spacepackets_server::{
17 SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer,
18};
19
20/// Configuration struct for the generic TCP TMTC server
21///
22/// ## Parameters
23///
24/// * `addr` - Address of the TCP server.
25/// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or
26/// no TM needs to be sent, the TCP server will delay for the specified amount of time
27/// to reduce CPU load.
28/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and
29/// encoding of that data. This buffer should at large enough to hold the maximum expected
30/// TM size read from the packet source.
31/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from
32/// the client. It is recommended to make this buffer larger to allow reading multiple
33/// consecutive packets as well, for example by using common buffer sizes like 4096 or 8192
34/// byte. The buffer should at the very least be large enough to hold the maximum expected
35/// telecommand size.
36/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is
37/// especially useful if the address and port are static for the server. Set to false by
38/// default.
39/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is
40/// especially useful if the address and port are static for the server. Set to false by
41/// default.
42#[derive(Debug, Copy, Clone)]
43pub struct ServerConfig {
44 pub addr: SocketAddr,
45 pub inner_loop_delay: Duration,
46 pub tm_buffer_size: usize,
47 pub tc_buffer_size: usize,
48 pub reuse_addr: bool,
49 pub reuse_port: bool,
50}
51
52impl ServerConfig {
53 pub fn new(
54 addr: SocketAddr,
55 inner_loop_delay: Duration,
56 tm_buffer_size: usize,
57 tc_buffer_size: usize,
58 ) -> Self {
59 Self {
60 addr,
61 inner_loop_delay,
62 tm_buffer_size,
63 tc_buffer_size,
64 reuse_addr: false,
65 reuse_port: false,
66 }
67 }
68}
69
70#[derive(Error, Debug)]
71pub enum TcpTmtcError<TmError, TcError> {
72 #[error("TM retrieval error: {0}")]
73 TmError(TmError),
74 #[error("TC retrieval error: {0}")]
75 TcError(TcError),
76 #[error("io error: {0}")]
77 Io(#[from] std::io::Error),
78}
79
80/// Result of one connection attempt. Contains the client address if a connection was established,
81/// in addition to the number of telecommands and telemetry packets exchanged.
82#[derive(Debug, Default)]
83pub struct ConnectionResult {
84 pub addr: Option<SocketAddr>,
85 pub num_received_tcs: u32,
86 pub num_sent_tms: u32,
87}
88
89/// Generic parser abstraction for an object which can parse for telecommands given a raw
90/// bytestream received from a TCP socket and send them to a generic [ReceivesTc] telecommand
91/// receiver. This allows different encoding schemes for telecommands.
92pub trait TcpTcParser<TmError, TcError> {
93 fn handle_tc_parsing(
94 &mut self,
95 tc_buffer: &mut [u8],
96 tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
97 conn_result: &mut ConnectionResult,
98 current_write_idx: usize,
99 next_write_idx: &mut usize,
100 ) -> Result<(), TcpTmtcError<TmError, TcError>>;
101}
102
103/// Generic sender abstraction for an object which can pull telemetry from a given TM source
104/// using a [TmPacketSource] and then send them back to a client using a given [TcpStream].
105/// The concrete implementation can also perform any encoding steps which are necessary before
106/// sending back the data to a client.
107pub trait TcpTmSender<TmError, TcError> {
108 fn handle_tm_sending(
109 &mut self,
110 tm_buffer: &mut [u8],
111 tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
112 conn_result: &mut ConnectionResult,
113 stream: &mut TcpStream,
114 ) -> Result<bool, TcpTmtcError<TmError, TcError>>;
115}
116
117/// TCP TMTC server implementation for exchange of generic TMTC packets in a generic way which
118/// stays agnostic to the encoding scheme and format used for both telecommands and telemetry.
119///
120/// This server implements a generic TMTC handling logic and allows modifying its behaviour
121/// through the following 4 core abstractions:
122///
123/// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client.
124/// 2. Parsed telecommands will be sent to the [ReceivesTc] telecommand receiver.
125/// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client.
126/// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender].
127///
128/// It is possible to specify custom abstractions to build a dedicated TCP TMTC server without
129/// having to re-implement common logic.
130///
131/// Currently, this framework offers the following concrete implementations:
132///
133/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
134pub struct TcpTmtcGenericServer<
135 TmError,
136 TcError,
137 TmSource: TmPacketSource<Error = TmError>,
138 TcReceiver: ReceivesTc<Error = TcError>,
139 TmSender: TcpTmSender<TmError, TcError>,
140 TcParser: TcpTcParser<TmError, TcError>,
141> {
142 pub(crate) listener: TcpListener,
143 pub(crate) inner_loop_delay: Duration,
144 pub(crate) tm_source: TmSource,
145 pub(crate) tm_buffer: Vec<u8>,
146 pub(crate) tc_receiver: TcReceiver,
147 pub(crate) tc_buffer: Vec<u8>,
148 tc_handler: TcParser,
149 tm_handler: TmSender,
150}
151
152impl<
153 TmError: 'static,
154 TcError: 'static,
155 TmSource: TmPacketSource<Error = TmError>,
156 TcReceiver: ReceivesTc<Error = TcError>,
157 TmSender: TcpTmSender<TmError, TcError>,
158 TcParser: TcpTcParser<TmError, TcError>,
159 > TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, TmSender, TcParser>
160{
161 /// Create a new generic TMTC server instance.
162 ///
163 /// ## Parameter
164 ///
165 /// * `cfg` - Configuration of the server.
166 /// * `tc_parser` - Parser which extracts telecommands from the raw bytestream received from
167 /// the client.
168 /// * `tm_sender` - Sends back telemetry to the client using the specified TM source.
169 /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
170 /// then sent back to the client.
171 /// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
172 /// to this TC receiver.
173 pub fn new(
174 cfg: ServerConfig,
175 tc_parser: TcParser,
176 tm_sender: TmSender,
177 tm_source: TmSource,
178 tc_receiver: TcReceiver,
179 ) -> Result<Self, std::io::Error> {
180 // Create a TCP listener bound to two addresses.
181 let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
182 socket.set_reuse_address(cfg.reuse_addr)?;
183 #[cfg(unix)]
184 socket.set_reuse_port(cfg.reuse_port)?;
185 let addr = (cfg.addr).into();
186 socket.bind(&addr)?;
187 socket.listen(128)?;
188 Ok(Self {
189 tc_handler: tc_parser,
190 tm_handler: tm_sender,
191 listener: socket.into(),
192 inner_loop_delay: cfg.inner_loop_delay,
193 tm_source,
194 tm_buffer: vec![0; cfg.tm_buffer_size],
195 tc_receiver,
196 tc_buffer: vec![0; cfg.tc_buffer_size],
197 })
198 }
199
200 /// Retrieve the internal [TcpListener] class.
201 pub fn listener(&mut self) -> &mut TcpListener {
202 &mut self.listener
203 }
204
205 /// Can be used to retrieve the local assigned address of the TCP server. This is especially
206 /// useful if using the port number 0 for OS auto-assignment.
207 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
208 self.listener.local_addr()
209 }
210
211 /// This call is used to handle the next connection to a client. Right now, it performs
212 /// the following steps:
213 ///
214 /// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
215 /// until a client connects.
216 /// 2. It reads all the telecommands from the client and parses all received data using the
217 /// user specified [TcpTcParser].
218 /// 3. After reading and parsing all telecommands, it sends back all telemetry using the
219 /// user specified [TcpTmSender].
220 ///
221 /// The server will delay for a user-specified period if the client connects to the server
222 /// for prolonged periods and there is no traffic for the server. This is the case if the
223 /// client does not send any telecommands and no telemetry needs to be sent back to the client.
224 pub fn handle_next_connection(
225 &mut self,
226 ) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
227 let mut connection_result = ConnectionResult::default();
228 let mut current_write_idx;
229 let mut next_write_idx = 0;
230 let (mut stream, addr) = self.listener.accept()?;
231 stream.set_nonblocking(true)?;
232 connection_result.addr = Some(addr);
233 current_write_idx = next_write_idx;
234 loop {
235 let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
236 match read_result {
237 Ok(0) => {
238 // Connection closed by client. If any TC was read, parse for complete packets.
239 // After that, break the outer loop.
240 if current_write_idx > 0 {
241 self.tc_handler.handle_tc_parsing(
242 &mut self.tc_buffer,
243 &mut self.tc_receiver,
244 &mut connection_result,
245 current_write_idx,
246 &mut next_write_idx,
247 )?;
248 }
249 break;
250 }
251 Ok(read_len) => {
252 current_write_idx += read_len;
253 // TC buffer is full, we must parse for complete packets now.
254 if current_write_idx == self.tc_buffer.capacity() {
255 self.tc_handler.handle_tc_parsing(
256 &mut self.tc_buffer,
257 &mut self.tc_receiver,
258 &mut connection_result,
259 current_write_idx,
260 &mut next_write_idx,
261 )?;
262 current_write_idx = next_write_idx;
263 }
264 }
265 Err(e) => match e.kind() {
266 // As per [TcpStream::set_read_timeout] documentation, this should work for
267 // both UNIX and Windows.
268 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
269 self.tc_handler.handle_tc_parsing(
270 &mut self.tc_buffer,
271 &mut self.tc_receiver,
272 &mut connection_result,
273 current_write_idx,
274 &mut next_write_idx,
275 )?;
276 current_write_idx = next_write_idx;
277
278 if !self.tm_handler.handle_tm_sending(
279 &mut self.tm_buffer,
280 &mut self.tm_source,
281 &mut connection_result,
282 &mut stream,
283 )? {
284 // No TC read, no TM was sent, but the client has not disconnected.
285 // Perform an inner delay to avoid burning CPU time.
286 thread::sleep(self.inner_loop_delay);
287 }
288 }
289 _ => {
290 return Err(TcpTmtcError::Io(e));
291 }
292 },
293 }
294 }
295 self.tm_handler.handle_tm_sending(
296 &mut self.tm_buffer,
297 &mut self.tm_source,
298 &mut connection_result,
299 &mut stream,
300 )?;
301 Ok(connection_result)
302 }
303}
304
305#[cfg(test)]
306pub(crate) mod tests {
307 use std::sync::Mutex;
308
309 use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
310
311 use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
312
313 #[derive(Default, Clone)]
314 pub(crate) struct SyncTcCacher {
315 pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
316 }
317 impl ReceivesTcCore for SyncTcCacher {
318 type Error = ();
319
320 fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
321 let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
322 tc_queue.push_back(tc_raw.to_vec());
323 Ok(())
324 }
325 }
326
327 #[derive(Default, Clone)]
328 pub(crate) struct SyncTmSource {
329 tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
330 }
331
332 impl SyncTmSource {
333 pub(crate) fn add_tm(&mut self, tm: &[u8]) {
334 let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
335 tm_queue.push_back(tm.to_vec());
336 }
337 }
338
339 impl TmPacketSourceCore for SyncTmSource {
340 type Error = ();
341
342 fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
343 let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
344 if !tm_queue.is_empty() {
345 let next_vec = tm_queue.front().unwrap();
346 if buffer.len() < next_vec.len() {
347 panic!(
348 "provided buffer too small, must be at least {} bytes",
349 next_vec.len()
350 );
351 }
352 let next_vec = tm_queue.pop_front().unwrap();
353 buffer[0..next_vec.len()].copy_from_slice(&next_vec);
354 return Ok(next_vec.len());
355 }
356 Ok(0)
357 }
358 }
359}