satrs 0.2.1

A framework to build software for remote systems
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
//! Generic TCP TMTC servers with different TMTC format flavours.
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use core::sync::atomic::AtomicBool;
use core::time::Duration;
use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Socket, Type};
use std::io::{self, Read};
use std::net::SocketAddr;
use std::thread;

use crate::tmtc::{PacketSenderRaw, PacketSource};
use crate::ComponentId;
use thiserror::Error;

// Re-export the TMTC in COBS server.
pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
pub use crate::hal::std::tcp_spacepackets_server::{SpacepacketsTmSender, TcpSpacepacketsServer};

/// Configuration struct for the generic TCP TMTC server
///
/// ## Parameters
///
/// * `addr` - Address of the TCP server.
/// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or
///     no TM needs to be sent, the TCP server will delay for the specified amount of time
///     to reduce CPU load.
/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [PacketSource] and
///     encoding of that data. This buffer should at large enough to hold the maximum expected
///     TM size read from the packet source.
/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from
///     the client. It is recommended to make this buffer larger to allow reading multiple
///     consecutive packets as well, for example by using common buffer sizes like 4096 or 8192
///     byte. The buffer should at the very least be large enough to hold the maximum expected
///     telecommand size.
/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is
///     especially useful if the address and port are static for the server. Set to false by
///     default.
/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is
///     especially useful if the address and port are static for the server. Set to false by
///     default.
#[derive(Debug, Copy, Clone)]
pub struct ServerConfig {
    pub id: ComponentId,
    pub addr: SocketAddr,
    pub inner_loop_delay: Duration,
    pub tm_buffer_size: usize,
    pub tc_buffer_size: usize,
    pub reuse_addr: bool,
    pub reuse_port: bool,
}

impl ServerConfig {
    pub fn new(
        id: ComponentId,
        addr: SocketAddr,
        inner_loop_delay: Duration,
        tm_buffer_size: usize,
        tc_buffer_size: usize,
    ) -> Self {
        Self {
            id,
            addr,
            inner_loop_delay,
            tm_buffer_size,
            tc_buffer_size,
            reuse_addr: true,
            reuse_port: true,
        }
    }
}

#[derive(Error, Debug)]
pub enum TcpTmtcError<TmError, TcError> {
    #[error("TM retrieval error: {0}")]
    TmError(TmError),
    #[error("TC retrieval error: {0}")]
    TcError(TcError),
    #[error("io error: {0}")]
    Io(#[from] std::io::Error),
}

/// Result of one connection attempt. Contains the client address if a connection was established,
/// in addition to the number of telecommands and telemetry packets exchanged.
#[derive(Debug, PartialEq, Eq)]
pub enum ConnectionResult {
    AcceptTimeout,
    HandledConnections(u32),
}

#[derive(Debug)]
pub struct HandledConnectionInfo {
    pub addr: SocketAddr,
    pub num_received_tcs: u32,
    pub num_sent_tms: u32,
    /// The generic TCP server can be stopped using an external signal. If this happened, this
    /// boolean will be set to true.
    pub stopped_by_signal: bool,
}

impl HandledConnectionInfo {
    pub fn new(addr: SocketAddr) -> Self {
        Self {
            addr,
            num_received_tcs: 0,
            num_sent_tms: 0,
            stopped_by_signal: false,
        }
    }
}

pub trait HandledConnectionHandler {
    fn handled_connection(&mut self, info: HandledConnectionInfo);
}

/// Generic parser abstraction for an object which can parse for telecommands given a raw
/// bytestream received from a TCP socket and send them using a generic [PacketSenderRaw]
/// implementation. This allows different encoding schemes for telecommands.
pub trait TcpTcParser<TmError, SendError> {
    fn handle_tc_parsing(
        &mut self,
        tc_buffer: &mut [u8],
        sender_id: ComponentId,
        tc_sender: &(impl PacketSenderRaw<Error = SendError> + ?Sized),
        conn_result: &mut HandledConnectionInfo,
        current_write_idx: usize,
        next_write_idx: &mut usize,
    ) -> Result<(), TcpTmtcError<TmError, SendError>>;
}

/// Generic sender abstraction for an object which can pull telemetry from a given TM source
/// using a [PacketSource] and then send them back to a client using a given [TcpStream].
/// The concrete implementation can also perform any encoding steps which are necessary before
/// sending back the data to a client.
pub trait TcpTmSender<TmError, TcError> {
    fn handle_tm_sending(
        &mut self,
        tm_buffer: &mut [u8],
        tm_source: &mut (impl PacketSource<Error = TmError> + ?Sized),
        conn_result: &mut HandledConnectionInfo,
        stream: &mut TcpStream,
    ) -> Result<bool, TcpTmtcError<TmError, TcError>>;
}

/// TCP TMTC server implementation for exchange of generic TMTC packets in a generic way which
/// stays agnostic to the encoding scheme and format used for both telecommands and telemetry.
///
/// This server implements a generic TMTC handling logic and allows modifying its behaviour
/// through the following 4 core abstractions:
///
/// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client.
/// 2. Parsed telecommands will be sent using the [PacketSenderRaw] object.
/// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client.
/// 4. [PacketSource] as a generic TM source used by the [TcpTmSender].
///
/// It is possible to specify custom abstractions to build a dedicated TCP TMTC server without
/// having to re-implement common logic.
///
/// Currently, this framework offers the following concrete implementations:
///
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
/// 2. [TcpSpacepacketsServer] to exchange space packets via TCP.
pub struct TcpTmtcGenericServer<
    TmSource: PacketSource<Error = TmError>,
    TcSender: PacketSenderRaw<Error = TcSendError>,
    TmSender: TcpTmSender<TmError, TcSendError>,
    TcParser: TcpTcParser<TmError, TcSendError>,
    HandledConnection: HandledConnectionHandler,
    TmError,
    TcSendError,
> {
    pub id: ComponentId,
    pub finished_handler: HandledConnection,
    pub(crate) listener: TcpListener,
    pub(crate) inner_loop_delay: Duration,
    pub(crate) tm_source: TmSource,
    pub(crate) tm_buffer: Vec<u8>,
    pub(crate) tc_sender: TcSender,
    pub(crate) tc_buffer: Vec<u8>,
    poll: Poll,
    events: Events,
    pub tc_handler: TcParser,
    pub tm_handler: TmSender,
    stop_signal: Option<Arc<AtomicBool>>,
}

impl<
        TmSource: PacketSource<Error = TmError>,
        TcSender: PacketSenderRaw<Error = TcSendError>,
        TmSender: TcpTmSender<TmError, TcSendError>,
        TcParser: TcpTcParser<TmError, TcSendError>,
        HandledConnection: HandledConnectionHandler,
        TmError: 'static,
        TcSendError: 'static,
    >
    TcpTmtcGenericServer<
        TmSource,
        TcSender,
        TmSender,
        TcParser,
        HandledConnection,
        TmError,
        TcSendError,
    >
{
    /// Create a new generic TMTC server instance.
    ///
    /// ## Parameter
    ///
    /// * `cfg` - Configuration of the server.
    /// * `tc_parser` - Parser which extracts telecommands from the raw bytestream received from
    ///    the client.
    /// * `tm_sender` - Sends back telemetry to the client using the specified TM source.
    /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
    ///     then sent back to the client.
    /// * `tc_sender` - Any received telecommand which was decoded successfully will be forwarded
    ///     using this TC sender.
    /// * `stop_signal` - Can be used to stop the server even if a connection is ongoing.
    pub fn new(
        cfg: ServerConfig,
        tc_parser: TcParser,
        tm_sender: TmSender,
        tm_source: TmSource,
        tc_receiver: TcSender,
        finished_handler: HandledConnection,
        stop_signal: Option<Arc<AtomicBool>>,
    ) -> Result<Self, std::io::Error> {
        // Create a TCP listener bound to two addresses.
        let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;

        socket.set_reuse_address(cfg.reuse_addr)?;
        #[cfg(unix)]
        socket.set_reuse_port(cfg.reuse_port)?;
        // MIO does not do this for us. We want the accept calls to be non-blocking.
        socket.set_nonblocking(true)?;
        let addr = (cfg.addr).into();
        socket.bind(&addr)?;
        socket.listen(128)?;

        // Create a poll instance.
        let poll = Poll::new()?;
        // Create storage for events.
        let events = Events::with_capacity(32);
        let listener: std::net::TcpListener = socket.into();
        let mut mio_listener = TcpListener::from_std(listener);

        // Start listening for incoming connections.
        poll.registry().register(
            &mut mio_listener,
            Token(0),
            Interest::READABLE | Interest::WRITABLE,
        )?;

        Ok(Self {
            id: cfg.id,
            tc_handler: tc_parser,
            tm_handler: tm_sender,
            poll,
            events,
            listener: mio_listener,
            inner_loop_delay: cfg.inner_loop_delay,
            tm_source,
            tm_buffer: vec![0; cfg.tm_buffer_size],
            tc_sender: tc_receiver,
            tc_buffer: vec![0; cfg.tc_buffer_size],
            stop_signal,
            finished_handler,
        })
    }

    /// Retrieve the internal [TcpListener] class.
    pub fn listener(&mut self) -> &mut TcpListener {
        &mut self.listener
    }

    /// Can be used to retrieve the local assigned address of the TCP server. This is especially
    /// useful if using the port number 0 for OS auto-assignment.
    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
        self.listener.local_addr()
    }

    /// This call is used to handle all connection from clients. Right now, it performs
    /// the following steps:
    ///
    /// 1. It calls the [std::net::TcpListener::accept] method until a client connects. An optional
    ///    timeout can be specified for non-blocking acceptance.
    /// 2. It reads all the telecommands from the client and parses all received data using the
    ///    user specified [TcpTcParser].
    /// 3. After reading and parsing all telecommands, it sends back all telemetry using the
    ///    user specified [TcpTmSender].
    ///
    /// The server will delay for a user-specified period if the client connects to the server
    /// for prolonged periods and there is no traffic for the server. This is the case if the
    /// client does not send any telecommands and no telemetry needs to be sent back to the client.
    pub fn handle_all_connections(
        &mut self,
        poll_timeout: Option<Duration>,
    ) -> Result<ConnectionResult, TcpTmtcError<TmError, TcSendError>> {
        let mut handled_connections = 0;
        // Poll Mio for events.
        self.poll.poll(&mut self.events, poll_timeout)?;
        let mut acceptable_connection = false;
        // Process each event.
        for event in self.events.iter() {
            if event.token() == Token(0) {
                acceptable_connection = true;
            } else {
                // Should never happen..
                panic!("unexpected TCP event token");
            }
        }
        // I'd love to do this in the loop above, but there are issues with multiple borrows.
        if acceptable_connection {
            // There might be mutliple connections available. Accept until all of them have
            // been handled.
            loop {
                match self.listener.accept() {
                    Ok((stream, addr)) => {
                        if let Err(e) = self.handle_accepted_connection(stream, addr) {
                            self.reregister_poll_interest()?;
                            return Err(e);
                        }
                        handled_connections += 1;
                    }
                    Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
                    Err(err) => {
                        self.reregister_poll_interest()?;
                        return Err(TcpTmtcError::Io(err));
                    }
                }
            }
        }
        if handled_connections > 0 {
            return Ok(ConnectionResult::HandledConnections(handled_connections));
        }
        Ok(ConnectionResult::AcceptTimeout)
    }

    fn reregister_poll_interest(&mut self) -> io::Result<()> {
        self.poll.registry().reregister(
            &mut self.listener,
            Token(0),
            Interest::READABLE | Interest::WRITABLE,
        )
    }

    fn handle_accepted_connection(
        &mut self,
        mut stream: TcpStream,
        addr: SocketAddr,
    ) -> Result<(), TcpTmtcError<TmError, TcSendError>> {
        let mut current_write_idx;
        let mut next_write_idx = 0;
        let mut connection_result = HandledConnectionInfo::new(addr);
        current_write_idx = next_write_idx;
        loop {
            let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
            match read_result {
                Ok(0) => {
                    // Connection closed by client. If any TC was read, parse for complete packets.
                    // After that, break the outer loop.
                    if current_write_idx > 0 {
                        self.tc_handler.handle_tc_parsing(
                            &mut self.tc_buffer,
                            self.id,
                            &self.tc_sender,
                            &mut connection_result,
                            current_write_idx,
                            &mut next_write_idx,
                        )?;
                    }
                    break;
                }
                Ok(read_len) => {
                    current_write_idx += read_len;
                    // TC buffer is full, we must parse for complete packets now.
                    if current_write_idx == self.tc_buffer.capacity() {
                        self.tc_handler.handle_tc_parsing(
                            &mut self.tc_buffer,
                            self.id,
                            &self.tc_sender,
                            &mut connection_result,
                            current_write_idx,
                            &mut next_write_idx,
                        )?;
                        current_write_idx = next_write_idx;
                    }
                }
                Err(e) => match e.kind() {
                    // As per [TcpStream::set_read_timeout] documentation, this should work for
                    // both UNIX and Windows.
                    std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
                        self.tc_handler.handle_tc_parsing(
                            &mut self.tc_buffer,
                            self.id,
                            &self.tc_sender,
                            &mut connection_result,
                            current_write_idx,
                            &mut next_write_idx,
                        )?;
                        current_write_idx = next_write_idx;

                        if !self.tm_handler.handle_tm_sending(
                            &mut self.tm_buffer,
                            &mut self.tm_source,
                            &mut connection_result,
                            &mut stream,
                        )? {
                            // No TC read, no TM was sent, but the client has not disconnected.
                            // Perform an inner delay to avoid burning CPU time.
                            thread::sleep(self.inner_loop_delay);
                            // Optional stop signal handling.
                            if self.stop_signal.is_some()
                                && self
                                    .stop_signal
                                    .as_ref()
                                    .unwrap()
                                    .load(std::sync::atomic::Ordering::Relaxed)
                            {
                                connection_result.stopped_by_signal = true;
                                self.finished_handler.handled_connection(connection_result);
                                return Ok(());
                            }
                        }
                    }
                    _ => {
                        return Err(TcpTmtcError::Io(e));
                    }
                },
            }
        }
        self.tm_handler.handle_tm_sending(
            &mut self.tm_buffer,
            &mut self.tm_source,
            &mut connection_result,
            &mut stream,
        )?;
        self.finished_handler.handled_connection(connection_result);
        Ok(())
    }
}

#[cfg(test)]
pub(crate) mod tests {
    use std::sync::Mutex;

    use alloc::{collections::VecDeque, sync::Arc, vec::Vec};

    use crate::tmtc::PacketSource;

    use super::*;

    #[derive(Default, Clone)]
    pub(crate) struct SyncTmSource {
        tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    }

    impl SyncTmSource {
        pub(crate) fn add_tm(&mut self, tm: &[u8]) {
            let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
            tm_queue.push_back(tm.to_vec());
        }
    }

    impl PacketSource for SyncTmSource {
        type Error = ();

        fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
            let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
            if !tm_queue.is_empty() {
                let next_vec = tm_queue.front().unwrap();
                if buffer.len() < next_vec.len() {
                    panic!(
                        "provided buffer too small, must be at least {} bytes",
                        next_vec.len()
                    );
                }
                let next_vec = tm_queue.pop_front().unwrap();
                buffer[0..next_vec.len()].copy_from_slice(&next_vec);
                return Ok(next_vec.len());
            }
            Ok(0)
        }
    }

    #[derive(Default)]
    pub struct ConnectionFinishedHandler {
        connection_info: VecDeque<HandledConnectionInfo>,
    }

    impl HandledConnectionHandler for ConnectionFinishedHandler {
        fn handled_connection(&mut self, info: HandledConnectionInfo) {
            self.connection_info.push_back(info);
        }
    }

    impl ConnectionFinishedHandler {
        pub fn check_last_connection(&mut self, num_tms: u32, num_tcs: u32) {
            let last_conn_result = self
                .connection_info
                .pop_back()
                .expect("no connection info available");
            assert_eq!(last_conn_result.num_received_tcs, num_tcs);
            assert_eq!(last_conn_result.num_sent_tms, num_tms);
        }

        pub fn check_no_connections_left(&self) {
            assert!(self.connection_info.is_empty());
        }
    }
}