satrs_core/hal/std/
tcp_spacepackets_server.rs

1use delegate::delegate;
2use std::{
3    io::Write,
4    net::{SocketAddr, TcpListener, TcpStream},
5};
6
7use alloc::boxed::Box;
8
9use crate::{
10    encoding::{ccsds::PacketIdLookup, parse_buffer_for_ccsds_space_packets},
11    tmtc::{ReceivesTc, TmPacketSource},
12};
13
14use super::tcp_server::{
15    ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
16};
17
18/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer].
19pub struct SpacepacketsTcParser {
20    packet_id_lookup: Box<dyn PacketIdLookup + Send>,
21}
22
23impl SpacepacketsTcParser {
24    pub fn new(packet_id_lookup: Box<dyn PacketIdLookup + Send>) -> Self {
25        Self { packet_id_lookup }
26    }
27}
28
29impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for SpacepacketsTcParser {
30    fn handle_tc_parsing(
31        &mut self,
32        tc_buffer: &mut [u8],
33        tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
34        conn_result: &mut ConnectionResult,
35        current_write_idx: usize,
36        next_write_idx: &mut usize,
37    ) -> Result<(), TcpTmtcError<TmError, TcError>> {
38        // Reader vec full, need to parse for packets.
39        conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
40            &mut tc_buffer[..current_write_idx],
41            self.packet_id_lookup.as_ref(),
42            tc_receiver.upcast_mut(),
43            next_write_idx,
44        )
45        .map_err(|e| TcpTmtcError::TcError(e))?;
46        Ok(())
47    }
48}
49
50/// Concrete [TcpTmSender] implementation for the [TcpSpacepacketsServer].
51#[derive(Default)]
52pub struct SpacepacketsTmSender {}
53
54impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
55    fn handle_tm_sending(
56        &mut self,
57        tm_buffer: &mut [u8],
58        tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
59        conn_result: &mut ConnectionResult,
60        stream: &mut TcpStream,
61    ) -> Result<bool, TcpTmtcError<TmError, TcError>> {
62        let mut tm_was_sent = false;
63        loop {
64            // Write TM until TM source is exhausted. For now, there is no limit for the amount
65            // of TM written this way.
66            let read_tm_len = tm_source
67                .retrieve_packet(tm_buffer)
68                .map_err(|e| TcpTmtcError::TmError(e))?;
69
70            if read_tm_len == 0 {
71                return Ok(tm_was_sent);
72            }
73            tm_was_sent = true;
74            conn_result.num_sent_tms += 1;
75
76            stream.write_all(&tm_buffer[..read_tm_len])?;
77        }
78    }
79}
80
81/// TCP TMTC server implementation for exchange of tightly stuffed
82/// [CCSDS space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf).
83///
84/// This serves only works if
85/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only
86/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter
87/// and start marker when parsing for packets. The user specifies a set of expected
88/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
89///
90/// ## Example
91/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
92/// also serves as the example application for this module.
93pub struct TcpSpacepacketsServer<
94    TmError,
95    TcError: 'static,
96    TmSource: TmPacketSource<Error = TmError>,
97    TcReceiver: ReceivesTc<Error = TcError>,
98> {
99    generic_server: TcpTmtcGenericServer<
100        TmError,
101        TcError,
102        TmSource,
103        TcReceiver,
104        SpacepacketsTmSender,
105        SpacepacketsTcParser,
106    >,
107}
108
109impl<
110        TmError: 'static,
111        TcError: 'static,
112        TmSource: TmPacketSource<Error = TmError>,
113        TcReceiver: ReceivesTc<Error = TcError>,
114    > TcpSpacepacketsServer<TmError, TcError, TmSource, TcReceiver>
115{
116    ///
117    /// ## Parameter
118    ///
119    /// * `cfg` - Configuration of the server.
120    /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
121    ///     then sent back to the client.
122    /// * `tc_receiver` - Any received telecommands which were decoded successfully will be
123    ///     forwarded to this TC receiver.
124    /// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet
125    ///     parsing. This mechanism is used to have a start marker for finding CCSDS packets.
126    pub fn new(
127        cfg: ServerConfig,
128        tm_source: TmSource,
129        tc_receiver: TcReceiver,
130        packet_id_lookup: Box<dyn PacketIdLookup + Send>,
131    ) -> Result<Self, std::io::Error> {
132        Ok(Self {
133            generic_server: TcpTmtcGenericServer::new(
134                cfg,
135                SpacepacketsTcParser::new(packet_id_lookup),
136                SpacepacketsTmSender::default(),
137                tm_source,
138                tc_receiver,
139            )?,
140        })
141    }
142
143    delegate! {
144        to self.generic_server {
145            pub fn listener(&mut self) -> &mut TcpListener;
146
147            /// Can be used to retrieve the local assigned address of the TCP server. This is especially
148            /// useful if using the port number 0 for OS auto-assignment.
149            pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
150
151            /// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
152            pub fn handle_next_connection(
153                &mut self,
154            ) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use core::{
162        sync::atomic::{AtomicBool, Ordering},
163        time::Duration,
164    };
165    #[allow(unused_imports)]
166    use std::println;
167    use std::{
168        io::{Read, Write},
169        net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
170        thread,
171    };
172
173    use alloc::{boxed::Box, sync::Arc};
174    use hashbrown::HashSet;
175    use spacepackets::{
176        ecss::{tc::PusTcCreator, WritablePusPacket},
177        PacketId, SpHeader,
178    };
179
180    use crate::hal::std::tcp_server::{
181        tests::{SyncTcCacher, SyncTmSource},
182        ServerConfig,
183    };
184
185    use super::TcpSpacepacketsServer;
186
187    const TEST_APID_0: u16 = 0x02;
188    const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
189    const TEST_APID_1: u16 = 0x10;
190    const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1);
191
192    fn generic_tmtc_server(
193        addr: &SocketAddr,
194        tc_receiver: SyncTcCacher,
195        tm_source: SyncTmSource,
196        packet_id_lookup: HashSet<PacketId>,
197    ) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher> {
198        TcpSpacepacketsServer::new(
199            ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
200            tm_source,
201            tc_receiver,
202            Box::new(packet_id_lookup),
203        )
204        .expect("TCP server generation failed")
205    }
206
207    #[test]
208    fn test_basic_tc_only() {
209        let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
210        let tc_receiver = SyncTcCacher::default();
211        let tm_source = SyncTmSource::default();
212        let mut packet_id_lookup = HashSet::new();
213        packet_id_lookup.insert(TEST_PACKET_ID_0);
214        let mut tcp_server = generic_tmtc_server(
215            &auto_port_addr,
216            tc_receiver.clone(),
217            tm_source,
218            packet_id_lookup,
219        );
220        let dest_addr = tcp_server
221            .local_addr()
222            .expect("retrieving dest addr failed");
223        let conn_handled: Arc<AtomicBool> = Default::default();
224        let set_if_done = conn_handled.clone();
225        // Call the connection handler in separate thread, does block.
226        thread::spawn(move || {
227            let result = tcp_server.handle_next_connection();
228            if result.is_err() {
229                panic!("handling connection failed: {:?}", result.unwrap_err());
230            }
231            let conn_result = result.unwrap();
232            assert_eq!(conn_result.num_received_tcs, 1);
233            assert_eq!(conn_result.num_sent_tms, 0);
234            set_if_done.store(true, Ordering::Relaxed);
235        });
236        let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
237        let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
238        let tc_0 = ping_tc.to_vec().expect("packet generation failed");
239        let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
240        stream
241            .write_all(&tc_0)
242            .expect("writing to TCP server failed");
243        drop(stream);
244
245        // A certain amount of time is allowed for the transaction to complete.
246        for _ in 0..3 {
247            if !conn_handled.load(Ordering::Relaxed) {
248                thread::sleep(Duration::from_millis(5));
249            }
250        }
251        if !conn_handled.load(Ordering::Relaxed) {
252            panic!("connection was not handled properly");
253        }
254        // Check that TC has arrived.
255        let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
256        assert_eq!(tc_queue.len(), 1);
257        assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
258    }
259
260    #[test]
261    fn test_multi_tc_multi_tm() {
262        let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
263        let tc_receiver = SyncTcCacher::default();
264        let mut tm_source = SyncTmSource::default();
265
266        // Add telemetry
267        let mut total_tm_len = 0;
268        let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
269        let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
270        let tm_0 = verif_tm.to_vec().expect("writing packet failed");
271        total_tm_len += tm_0.len();
272        tm_source.add_tm(&tm_0);
273        let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
274        let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true);
275        let tm_1 = verif_tm.to_vec().expect("writing packet failed");
276        total_tm_len += tm_1.len();
277        tm_source.add_tm(&tm_1);
278
279        // Set up server
280        let mut packet_id_lookup = HashSet::new();
281        packet_id_lookup.insert(TEST_PACKET_ID_0);
282        packet_id_lookup.insert(TEST_PACKET_ID_1);
283        let mut tcp_server = generic_tmtc_server(
284            &auto_port_addr,
285            tc_receiver.clone(),
286            tm_source,
287            packet_id_lookup,
288        );
289        let dest_addr = tcp_server
290            .local_addr()
291            .expect("retrieving dest addr failed");
292        let conn_handled: Arc<AtomicBool> = Default::default();
293        let set_if_done = conn_handled.clone();
294
295        // Call the connection handler in separate thread, does block.
296        thread::spawn(move || {
297            let result = tcp_server.handle_next_connection();
298            if result.is_err() {
299                panic!("handling connection failed: {:?}", result.unwrap_err());
300            }
301            let conn_result = result.unwrap();
302            assert_eq!(
303                conn_result.num_received_tcs, 2,
304                "wrong number of received TCs"
305            );
306            assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs");
307            set_if_done.store(true, Ordering::Relaxed);
308        });
309        let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
310        stream
311            .set_read_timeout(Some(Duration::from_millis(10)))
312            .expect("setting reas timeout failed");
313
314        // Send telecommands
315        let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
316        let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
317        let tc_0 = ping_tc.to_vec().expect("ping tc creation failed");
318        stream
319            .write_all(&tc_0)
320            .expect("writing to TCP server failed");
321        let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
322        let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
323        let tc_1 = action_tc.to_vec().expect("action tc creation failed");
324        stream
325            .write_all(&tc_1)
326            .expect("writing to TCP server failed");
327
328        // Done with writing.
329        stream
330            .shutdown(std::net::Shutdown::Write)
331            .expect("shutting down write failed");
332        let mut read_buf: [u8; 32] = [0; 32];
333        let mut current_idx = 0;
334        let mut read_len_total = 0;
335        // Timeout ensures this does not block forever.
336        while read_len_total < total_tm_len {
337            let read_len = stream
338                .read(&mut read_buf[current_idx..])
339                .expect("read failed");
340            current_idx += read_len;
341            read_len_total += read_len;
342        }
343        drop(stream);
344        assert_eq!(read_buf[..tm_0.len()], tm_0);
345        assert_eq!(read_buf[tm_0.len()..tm_0.len() + tm_1.len()], tm_1);
346
347        // A certain amount of time is allowed for the transaction to complete.
348        for _ in 0..3 {
349            if !conn_handled.load(Ordering::Relaxed) {
350                thread::sleep(Duration::from_millis(5));
351            }
352        }
353        if !conn_handled.load(Ordering::Relaxed) {
354            panic!("connection was not handled properly");
355        }
356        // Check that TC has arrived.
357        let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
358        assert_eq!(tc_queue.len(), 2);
359        assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
360        assert_eq!(tc_queue.pop_front().unwrap(), tc_1);
361    }
362}