satrs_core/tmtc/
pus_distrib.rs

1//! ECSS PUS packet routing components.
2//!
3//! The routing components consist of two core components:
4//!  1. [PusDistributor] component which dispatches received packets to a user-provided handler.
5//!  2. [PusServiceProvider] trait which should be implemented by the user-provided PUS packet
6//!     handler.
7//!
8//! The [PusDistributor] implements the [ReceivesEcssPusTc], [ReceivesCcsdsTc] and the
9//! [ReceivesTcCore] trait which allows to pass raw packets, CCSDS packets and PUS TC packets into
10//! it. Upon receiving a packet, it performs the following steps:
11//!
12//! 1. It tries to extract the [SpHeader] and [spacepackets::ecss::tc::PusTcReader] objects from
13//!    the raw bytestream. If this process fails, a [PusDistribError::PusError] is returned to the
14//!    user.
15//! 2. If it was possible to extract both components, the packet will be passed to the
16//!    [PusServiceProvider::handle_pus_tc_packet] method provided by the user.
17//!
18//! # Example
19//!
20//! ```rust
21//! use spacepackets::ecss::WritablePusPacket;
22//! use satrs_core::tmtc::pus_distrib::{PusDistributor, PusServiceProvider};
23//! use satrs_core::tmtc::{ReceivesTc, ReceivesTcCore};
24//! use spacepackets::SpHeader;
25//! use spacepackets::ecss::tc::{PusTcCreator, PusTcReader};
26//! struct ConcretePusHandler {
27//!     handler_call_count: u32
28//! }
29//!
30//! // This is a very simple possible service provider. It increments an internal call count field,
31//! // which is used to verify the handler was called
32//! impl PusServiceProvider for ConcretePusHandler {
33//!     type Error = ();
34//!     fn handle_pus_tc_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
35//!         assert_eq!(service, 17);
36//!         assert_eq!(pus_tc.len_packed(), 13);
37//!         self.handler_call_count += 1;
38//!         Ok(())
39//!     }
40//! }
41//!
42//! let service_handler = ConcretePusHandler {
43//!     handler_call_count: 0
44//! };
45//! let mut pus_distributor = PusDistributor::new(Box::new(service_handler));
46//!
47//! // Create and pass PUS ping telecommand with a valid APID
48//! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap();
49//! let mut pus_tc = PusTcCreator::new_simple(&mut space_packet_header, 17, 1, None, true);
50//! let mut test_buf: [u8; 32] = [0; 32];
51//! let mut size = pus_tc
52//!     .write_to_bytes(test_buf.as_mut_slice())
53//!     .expect("Error writing TC to buffer");
54//! let tc_slice = &test_buf[0..size];
55//!
56//! pus_distributor.pass_tc(tc_slice).expect("Passing PUS telecommand failed");
57//!
58//! // User helper function to retrieve concrete class. We check the call count here to verify
59//! // that the PUS ping telecommand was routed successfully.
60//! let concrete_handler_ref: &ConcretePusHandler = pus_distributor
61//!     .service_provider_ref()
62//!     .expect("Casting back to concrete type failed");
63//! assert_eq!(concrete_handler_ref.handler_call_count, 1);
64//! ```
65use crate::pus::ReceivesEcssPusTc;
66use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore};
67use alloc::boxed::Box;
68use core::fmt::{Display, Formatter};
69use downcast_rs::Downcast;
70use spacepackets::ecss::tc::PusTcReader;
71use spacepackets::ecss::{PusError, PusPacket};
72use spacepackets::SpHeader;
73#[cfg(feature = "std")]
74use std::error::Error;
75
76pub trait PusServiceProvider: Downcast {
77    type Error;
78    fn handle_pus_tc_packet(
79        &mut self,
80        service: u8,
81        header: &SpHeader,
82        pus_tc: &PusTcReader,
83    ) -> Result<(), Self::Error>;
84}
85downcast_rs::impl_downcast!(PusServiceProvider assoc Error);
86
87pub trait SendablePusServiceProvider: PusServiceProvider + Send {}
88
89impl<T: Send + PusServiceProvider> SendablePusServiceProvider for T {}
90
91downcast_rs::impl_downcast!(SendablePusServiceProvider assoc Error);
92
93/// Generic distributor object which dispatches received packets to a user provided handler.
94///
95/// This distributor expects the passed trait object to be [Send]able to allow more ergonomic
96/// usage with threads.
97pub struct PusDistributor<E> {
98    pub service_provider: Box<dyn SendablePusServiceProvider<Error = E>>,
99}
100
101impl<E> PusDistributor<E> {
102    pub fn new(service_provider: Box<dyn SendablePusServiceProvider<Error = E>>) -> Self {
103        PusDistributor { service_provider }
104    }
105}
106
107#[derive(Debug, Copy, Clone, PartialEq, Eq)]
108pub enum PusDistribError<E> {
109    CustomError(E),
110    PusError(PusError),
111}
112
113impl<E: Display> Display for PusDistribError<E> {
114    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
115        match self {
116            PusDistribError::CustomError(e) => write!(f, "{e}"),
117            PusDistribError::PusError(e) => write!(f, "{e}"),
118        }
119    }
120}
121
122#[cfg(feature = "std")]
123impl<E: Error> Error for PusDistribError<E> {
124    fn source(&self) -> Option<&(dyn Error + 'static)> {
125        match self {
126            Self::CustomError(e) => e.source(),
127            Self::PusError(e) => e.source(),
128        }
129    }
130}
131
132impl<E: 'static> ReceivesTcCore for PusDistributor<E> {
133    type Error = PusDistribError<E>;
134    fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> {
135        // Convert to ccsds and call pass_ccsds
136        let (sp_header, _) = SpHeader::from_be_bytes(tm_raw)
137            .map_err(|e| PusDistribError::PusError(PusError::ByteConversion(e)))?;
138        self.pass_ccsds(&sp_header, tm_raw)
139    }
140}
141
142impl<E: 'static> ReceivesCcsdsTc for PusDistributor<E> {
143    type Error = PusDistribError<E>;
144    fn pass_ccsds(&mut self, header: &SpHeader, tm_raw: &[u8]) -> Result<(), Self::Error> {
145        let (tc, _) = PusTcReader::new(tm_raw).map_err(|e| PusDistribError::PusError(e))?;
146        self.pass_pus_tc(header, &tc)
147    }
148}
149
150impl<E: 'static> ReceivesEcssPusTc for PusDistributor<E> {
151    type Error = PusDistribError<E>;
152    fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
153        self.service_provider
154            .handle_pus_tc_packet(pus_tc.service(), header, pus_tc)
155            .map_err(|e| PusDistribError::CustomError(e))
156    }
157}
158
159impl<E: 'static> PusDistributor<E> {
160    pub fn service_provider_ref<T: SendablePusServiceProvider<Error = E>>(&self) -> Option<&T> {
161        self.service_provider.downcast_ref::<T>()
162    }
163
164    pub fn service_provider_mut<T: SendablePusServiceProvider<Error = E>>(
165        &mut self,
166    ) -> Option<&mut T> {
167        self.service_provider.downcast_mut::<T>()
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::tmtc::ccsds_distrib::tests::{
175        generate_ping_tc, BasicApidHandlerOwnedQueue, BasicApidHandlerSharedQueue,
176    };
177    use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler};
178    use alloc::vec::Vec;
179    use spacepackets::ecss::PusError;
180    use spacepackets::CcsdsPacket;
181    #[cfg(feature = "std")]
182    use std::collections::VecDeque;
183    #[cfg(feature = "std")]
184    use std::sync::{Arc, Mutex};
185
186    fn is_send<T: Send>(_: &T) {}
187
188    struct PusHandlerSharedQueue {
189        pub pus_queue: Arc<Mutex<VecDeque<(u8, u16, Vec<u8>)>>>,
190    }
191
192    #[derive(Default)]
193    struct PusHandlerOwnedQueue {
194        pub pus_queue: VecDeque<(u8, u16, Vec<u8>)>,
195    }
196
197    impl PusServiceProvider for PusHandlerSharedQueue {
198        type Error = PusError;
199        fn handle_pus_tc_packet(
200            &mut self,
201            service: u8,
202            sp_header: &SpHeader,
203            pus_tc: &PusTcReader,
204        ) -> Result<(), Self::Error> {
205            let mut vec: Vec<u8> = Vec::new();
206            vec.extend_from_slice(pus_tc.raw_data());
207            Ok(self
208                .pus_queue
209                .lock()
210                .expect("Mutex lock failed")
211                .push_back((service, sp_header.apid(), vec)))
212        }
213    }
214
215    impl PusServiceProvider for PusHandlerOwnedQueue {
216        type Error = PusError;
217        fn handle_pus_tc_packet(
218            &mut self,
219            service: u8,
220            sp_header: &SpHeader,
221            pus_tc: &PusTcReader,
222        ) -> Result<(), Self::Error> {
223            let mut vec: Vec<u8> = Vec::new();
224            vec.extend_from_slice(pus_tc.raw_data());
225            Ok(self.pus_queue.push_back((service, sp_header.apid(), vec)))
226        }
227    }
228
229    struct ApidHandlerShared {
230        pub pus_distrib: PusDistributor<PusError>,
231        pub handler_base: BasicApidHandlerSharedQueue,
232    }
233
234    struct ApidHandlerOwned {
235        pub pus_distrib: PusDistributor<PusError>,
236        handler_base: BasicApidHandlerOwnedQueue,
237    }
238
239    macro_rules! apid_handler_impl {
240        () => {
241            type Error = PusError;
242
243            fn valid_apids(&self) -> &'static [u16] {
244                &[0x000, 0x002]
245            }
246
247            fn handle_known_apid(
248                &mut self,
249                sp_header: &SpHeader,
250                tc_raw: &[u8],
251            ) -> Result<(), Self::Error> {
252                self.handler_base
253                    .handle_known_apid(&sp_header, tc_raw)
254                    .ok()
255                    .expect("Unexpected error");
256                match self.pus_distrib.pass_ccsds(&sp_header, tc_raw) {
257                    Ok(_) => Ok(()),
258                    Err(e) => match e {
259                        PusDistribError::CustomError(_) => Ok(()),
260                        PusDistribError::PusError(e) => Err(e),
261                    },
262                }
263            }
264
265            fn handle_unknown_apid(
266                &mut self,
267                sp_header: &SpHeader,
268                tc_raw: &[u8],
269            ) -> Result<(), Self::Error> {
270                self.handler_base
271                    .handle_unknown_apid(&sp_header, tc_raw)
272                    .ok()
273                    .expect("Unexpected error");
274                Ok(())
275            }
276        };
277    }
278
279    impl CcsdsPacketHandler for ApidHandlerOwned {
280        apid_handler_impl!();
281    }
282
283    impl CcsdsPacketHandler for ApidHandlerShared {
284        apid_handler_impl!();
285    }
286
287    #[test]
288    #[cfg(feature = "std")]
289    fn test_pus_distribution() {
290        let known_packet_queue = Arc::new(Mutex::default());
291        let unknown_packet_queue = Arc::new(Mutex::default());
292        let pus_queue = Arc::new(Mutex::default());
293        let pus_handler = PusHandlerSharedQueue {
294            pus_queue: pus_queue.clone(),
295        };
296        let handler_base = BasicApidHandlerSharedQueue {
297            known_packet_queue: known_packet_queue.clone(),
298            unknown_packet_queue: unknown_packet_queue.clone(),
299        };
300
301        let pus_distrib = PusDistributor {
302            service_provider: Box::new(pus_handler),
303        };
304        is_send(&pus_distrib);
305        let apid_handler = ApidHandlerShared {
306            pus_distrib,
307            handler_base,
308        };
309        let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler));
310        let mut test_buf: [u8; 32] = [0; 32];
311        let tc_slice = generate_ping_tc(test_buf.as_mut_slice());
312
313        // Pass packet to distributor
314        ccsds_distrib
315            .pass_tc(tc_slice)
316            .expect("Passing TC slice failed");
317        let recvd_ccsds = known_packet_queue.lock().unwrap().pop_front();
318        assert!(unknown_packet_queue.lock().unwrap().is_empty());
319        assert!(recvd_ccsds.is_some());
320        let (apid, packet) = recvd_ccsds.unwrap();
321        assert_eq!(apid, 0x002);
322        assert_eq!(packet.as_slice(), tc_slice);
323        let recvd_pus = pus_queue.lock().unwrap().pop_front();
324        assert!(recvd_pus.is_some());
325        let (service, apid, tc_raw) = recvd_pus.unwrap();
326        assert_eq!(service, 17);
327        assert_eq!(apid, 0x002);
328        assert_eq!(tc_raw, tc_slice);
329    }
330
331    #[test]
332    fn test_as_any_cast() {
333        let pus_handler = PusHandlerOwnedQueue::default();
334        let handler_base = BasicApidHandlerOwnedQueue::default();
335        let pus_distrib = PusDistributor {
336            service_provider: Box::new(pus_handler),
337        };
338
339        let apid_handler = ApidHandlerOwned {
340            pus_distrib,
341            handler_base,
342        };
343        let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler));
344
345        let mut test_buf: [u8; 32] = [0; 32];
346        let tc_slice = generate_ping_tc(test_buf.as_mut_slice());
347
348        ccsds_distrib
349            .pass_tc(tc_slice)
350            .expect("Passing TC slice failed");
351
352        let apid_handler_casted_back: &mut ApidHandlerOwned = ccsds_distrib
353            .apid_handler_mut()
354            .expect("Cast to concrete type ApidHandler failed");
355        assert!(!apid_handler_casted_back
356            .handler_base
357            .known_packet_queue
358            .is_empty());
359        let handler_casted_back: &mut PusHandlerOwnedQueue = apid_handler_casted_back
360            .pus_distrib
361            .service_provider_mut()
362            .expect("Cast to concrete type PusHandlerOwnedQueue failed");
363        assert!(!handler_casted_back.pus_queue.is_empty());
364        let (service, apid, packet_raw) = handler_casted_back.pus_queue.pop_front().unwrap();
365        assert_eq!(service, 17);
366        assert_eq!(apid, 0x002);
367        assert_eq!(packet_raw.as_slice(), tc_slice);
368    }
369}