1use crate::queue::GenericSendError;
11use crate::{
12 pool::{PoolAddr, PoolError},
13 ComponentId,
14};
15#[cfg(feature = "std")]
16pub use alloc_mod::*;
17#[cfg(feature = "alloc")]
18use downcast_rs::{impl_downcast, Downcast};
19use spacepackets::{
20 ecss::{
21 tc::PusTcReader,
22 tm::{PusTmCreator, PusTmReader},
23 },
24 SpHeader,
25};
26#[cfg(feature = "std")]
27use std::sync::mpsc;
28#[cfg(feature = "std")]
29pub use std_mod::*;
30
31pub mod tm_helper;
32
33#[derive(Debug, PartialEq, Eq, Clone)]
36pub struct PacketInPool {
37 pub sender_id: ComponentId,
38 pub store_addr: PoolAddr,
39}
40
41impl PacketInPool {
42 pub fn new(sender_id: ComponentId, store_addr: PoolAddr) -> Self {
43 Self {
44 sender_id,
45 store_addr,
46 }
47 }
48}
49
50pub trait PacketSenderRaw: Send {
53 type Error;
54 fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error>;
55}
56
57#[cfg(feature = "alloc")]
59pub trait PacketSenderRawExt: PacketSenderRaw + Downcast {
60 fn upcast(&self) -> &dyn PacketSenderRaw<Error = Self::Error>;
63 fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw<Error = Self::Error>;
66}
67
68#[cfg(feature = "alloc")]
71impl<T> PacketSenderRawExt for T
72where
73 T: PacketSenderRaw + Send + 'static,
74{
75 fn upcast(&self) -> &dyn PacketSenderRaw<Error = Self::Error> {
78 self
79 }
80 fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw<Error = Self::Error> {
83 self
84 }
85}
86
87#[cfg(feature = "alloc")]
88impl_downcast!(PacketSenderRawExt assoc Error);
89
90pub trait PacketSenderCcsds: Send {
93 type Error;
94 fn send_ccsds(
95 &self,
96 sender_id: ComponentId,
97 header: &SpHeader,
98 tc_raw: &[u8],
99 ) -> Result<(), Self::Error>;
100}
101
102#[cfg(feature = "std")]
103impl PacketSenderCcsds for mpsc::Sender<PacketAsVec> {
104 type Error = GenericSendError;
105
106 fn send_ccsds(
107 &self,
108 sender_id: ComponentId,
109 _: &SpHeader,
110 tc_raw: &[u8],
111 ) -> Result<(), Self::Error> {
112 self.send(PacketAsVec::new(sender_id, tc_raw.to_vec()))
113 .map_err(|_| GenericSendError::RxDisconnected)
114 }
115}
116
117#[cfg(feature = "std")]
118impl PacketSenderCcsds for mpsc::SyncSender<PacketAsVec> {
119 type Error = GenericSendError;
120
121 fn send_ccsds(
122 &self,
123 sender_id: ComponentId,
124 _: &SpHeader,
125 packet_raw: &[u8],
126 ) -> Result<(), Self::Error> {
127 self.try_send(PacketAsVec::new(sender_id, packet_raw.to_vec()))
128 .map_err(|e| match e {
129 mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
130 mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
131 })
132 }
133}
134
135pub trait PacketSource: Send {
138 type Error;
139 fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
140}
141
142#[cfg(feature = "alloc")]
144pub trait PacketSourceExt: PacketSource + Downcast {
145 fn upcast(&self) -> &dyn PacketSource<Error = Self::Error>;
148 fn upcast_mut(&mut self) -> &mut dyn PacketSource<Error = Self::Error>;
151}
152
153#[cfg(feature = "alloc")]
156impl<T> PacketSourceExt for T
157where
158 T: PacketSource + 'static,
159{
160 fn upcast(&self) -> &dyn PacketSource<Error = Self::Error> {
163 self
164 }
165 fn upcast_mut(&mut self) -> &mut dyn PacketSource<Error = Self::Error> {
168 self
169 }
170}
171
172pub trait CcsdsPacketPool {
174 fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<PoolAddr, PoolError> {
175 self.add_raw_tc(tc_raw)
176 }
177
178 fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result<PoolAddr, PoolError>;
179}
180
181pub trait PusTcPool {
183 fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<PoolAddr, PoolError>;
184}
185
186pub trait PusTmPool {
188 fn add_pus_tm_from_reader(&mut self, pus_tm: &PusTmReader) -> Result<PoolAddr, PoolError>;
189 fn add_pus_tm_from_creator(&mut self, pus_tm: &PusTmCreator) -> Result<PoolAddr, PoolError>;
190}
191
192pub trait PacketInPoolSender: Send {
194 fn send_packet(
195 &self,
196 sender_id: ComponentId,
197 store_addr: PoolAddr,
198 ) -> Result<(), GenericSendError>;
199}
200
201#[cfg(feature = "alloc")]
202pub mod alloc_mod {
203 use alloc::vec::Vec;
204
205 use super::*;
206
207 #[derive(Debug, PartialEq, Eq, Clone)]
210 pub struct PacketAsVec {
211 pub sender_id: ComponentId,
212 pub packet: Vec<u8>,
213 }
214
215 impl PacketAsVec {
216 pub fn new(sender_id: ComponentId, packet: Vec<u8>) -> Self {
217 Self { sender_id, packet }
218 }
219 }
220}
221#[cfg(feature = "std")]
222pub mod std_mod {
223
224 use core::cell::RefCell;
225
226 #[cfg(feature = "crossbeam")]
227 use crossbeam_channel as cb;
228 use spacepackets::ecss::WritablePusPacket;
229 use thiserror::Error;
230
231 use crate::pool::PoolProvider;
232 use crate::pus::{EcssTmSender, EcssTmtcError, PacketSenderPusTc};
233
234 use super::*;
235
236 #[derive(Clone)]
239 pub struct SharedPacketPool(pub SharedStaticMemoryPool);
240
241 impl SharedPacketPool {
242 pub fn new(pool: &SharedStaticMemoryPool) -> Self {
243 Self(pool.clone())
244 }
245 }
246
247 impl PusTcPool for SharedPacketPool {
248 fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<PoolAddr, PoolError> {
249 let mut pg = self.0.write().map_err(|_| PoolError::LockError)?;
250 let addr = pg.free_element(pus_tc.len_packed(), |buf| {
251 buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
252 })?;
253 Ok(addr)
254 }
255 }
256
257 impl PusTmPool for SharedPacketPool {
258 fn add_pus_tm_from_reader(&mut self, pus_tm: &PusTmReader) -> Result<PoolAddr, PoolError> {
259 let mut pg = self.0.write().map_err(|_| PoolError::LockError)?;
260 let addr = pg.free_element(pus_tm.len_packed(), |buf| {
261 buf[0..pus_tm.len_packed()].copy_from_slice(pus_tm.raw_data());
262 })?;
263 Ok(addr)
264 }
265
266 fn add_pus_tm_from_creator(
267 &mut self,
268 pus_tm: &PusTmCreator,
269 ) -> Result<PoolAddr, PoolError> {
270 let mut pg = self.0.write().map_err(|_| PoolError::LockError)?;
271 let mut result = Ok(0);
272 let addr = pg.free_element(pus_tm.len_written(), |buf| {
273 result = pus_tm.write_to_bytes(buf);
274 })?;
275 result?;
276 Ok(addr)
277 }
278 }
279
280 impl CcsdsPacketPool for SharedPacketPool {
281 fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result<PoolAddr, PoolError> {
282 let mut pg = self.0.write().map_err(|_| PoolError::LockError)?;
283 let addr = pg.free_element(tc_raw.len(), |buf| {
284 buf[0..tc_raw.len()].copy_from_slice(tc_raw);
285 })?;
286 Ok(addr)
287 }
288 }
289
290 #[cfg(feature = "std")]
291 impl PacketSenderRaw for mpsc::Sender<PacketAsVec> {
292 type Error = GenericSendError;
293
294 fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
295 self.send(PacketAsVec::new(sender_id, packet.to_vec()))
296 .map_err(|_| GenericSendError::RxDisconnected)
297 }
298 }
299
300 #[cfg(feature = "std")]
301 impl PacketSenderRaw for mpsc::SyncSender<PacketAsVec> {
302 type Error = GenericSendError;
303
304 fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> {
305 self.try_send(PacketAsVec::new(sender_id, tc_raw.to_vec()))
306 .map_err(|e| match e {
307 mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
308 mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
309 })
310 }
311 }
312
313 #[derive(Debug, Clone, PartialEq, Eq, Error)]
314 pub enum StoreAndSendError {
315 #[error("Store error: {0}")]
316 Store(#[from] PoolError),
317 #[error("Genreric send error: {0}")]
318 Send(#[from] GenericSendError),
319 }
320
321 pub use crate::pool::SharedStaticMemoryPool;
322
323 impl PacketInPoolSender for mpsc::Sender<PacketInPool> {
324 fn send_packet(
325 &self,
326 sender_id: ComponentId,
327 store_addr: PoolAddr,
328 ) -> Result<(), GenericSendError> {
329 self.send(PacketInPool::new(sender_id, store_addr))
330 .map_err(|_| GenericSendError::RxDisconnected)
331 }
332 }
333
334 impl PacketInPoolSender for mpsc::SyncSender<PacketInPool> {
335 fn send_packet(
336 &self,
337 sender_id: ComponentId,
338 store_addr: PoolAddr,
339 ) -> Result<(), GenericSendError> {
340 self.try_send(PacketInPool::new(sender_id, store_addr))
341 .map_err(|e| match e {
342 mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
343 mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
344 })
345 }
346 }
347
348 #[cfg(feature = "crossbeam")]
349 impl PacketInPoolSender for cb::Sender<PacketInPool> {
350 fn send_packet(
351 &self,
352 sender_id: ComponentId,
353 store_addr: PoolAddr,
354 ) -> Result<(), GenericSendError> {
355 self.try_send(PacketInPool::new(sender_id, store_addr))
356 .map_err(|e| match e {
357 cb::TrySendError::Full(_) => GenericSendError::QueueFull(None),
358 cb::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
359 })
360 }
361 }
362
363 #[derive(Clone)]
366 pub struct PacketSenderWithSharedPool<
367 Sender: PacketInPoolSender = mpsc::SyncSender<PacketInPool>,
368 PacketPool: CcsdsPacketPool = SharedPacketPool,
369 > {
370 pub sender: Sender,
371 pub shared_pool: RefCell<PacketPool>,
372 }
373
374 impl<Sender: PacketInPoolSender> PacketSenderWithSharedPool<Sender, SharedPacketPool> {
375 pub fn new_with_shared_packet_pool(
376 packet_sender: Sender,
377 shared_pool: &SharedStaticMemoryPool,
378 ) -> Self {
379 Self {
380 sender: packet_sender,
381 shared_pool: RefCell::new(SharedPacketPool::new(shared_pool)),
382 }
383 }
384 }
385
386 impl<Sender: PacketInPoolSender, PacketStore: CcsdsPacketPool>
387 PacketSenderWithSharedPool<Sender, PacketStore>
388 {
389 pub fn new(packet_sender: Sender, shared_pool: PacketStore) -> Self {
390 Self {
391 sender: packet_sender,
392 shared_pool: RefCell::new(shared_pool),
393 }
394 }
395 }
396
397 impl<Sender: PacketInPoolSender, PacketStore: CcsdsPacketPool + Clone>
398 PacketSenderWithSharedPool<Sender, PacketStore>
399 {
400 pub fn shared_packet_store(&self) -> PacketStore {
401 let pool = self.shared_pool.borrow();
402 pool.clone()
403 }
404 }
405
406 impl<Sender: PacketInPoolSender, PacketStore: CcsdsPacketPool + Send> PacketSenderRaw
407 for PacketSenderWithSharedPool<Sender, PacketStore>
408 {
409 type Error = StoreAndSendError;
410
411 fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
412 let mut shared_pool = self.shared_pool.borrow_mut();
413 let store_addr = shared_pool.add_raw_tc(packet)?;
414 drop(shared_pool);
415 self.sender
416 .send_packet(sender_id, store_addr)
417 .map_err(StoreAndSendError::Send)?;
418 Ok(())
419 }
420 }
421
422 impl<Sender: PacketInPoolSender, PacketStore: CcsdsPacketPool + PusTcPool + Send>
423 PacketSenderPusTc for PacketSenderWithSharedPool<Sender, PacketStore>
424 {
425 type Error = StoreAndSendError;
426
427 fn send_pus_tc(
428 &self,
429 sender_id: ComponentId,
430 _: &SpHeader,
431 pus_tc: &PusTcReader,
432 ) -> Result<(), Self::Error> {
433 let mut shared_pool = self.shared_pool.borrow_mut();
434 let store_addr = shared_pool.add_raw_tc(pus_tc.raw_data())?;
435 drop(shared_pool);
436 self.sender
437 .send_packet(sender_id, store_addr)
438 .map_err(StoreAndSendError::Send)?;
439 Ok(())
440 }
441 }
442
443 impl<Sender: PacketInPoolSender, PacketStore: CcsdsPacketPool + Send> PacketSenderCcsds
444 for PacketSenderWithSharedPool<Sender, PacketStore>
445 {
446 type Error = StoreAndSendError;
447
448 fn send_ccsds(
449 &self,
450 sender_id: ComponentId,
451 _sp_header: &SpHeader,
452 tc_raw: &[u8],
453 ) -> Result<(), Self::Error> {
454 self.send_packet(sender_id, tc_raw)
455 }
456 }
457
458 impl<Sender: PacketInPoolSender, PacketStore: CcsdsPacketPool + PusTmPool + Send> EcssTmSender
459 for PacketSenderWithSharedPool<Sender, PacketStore>
460 {
461 fn send_tm(
462 &self,
463 sender_id: crate::ComponentId,
464 tm: crate::pus::PusTmVariant,
465 ) -> Result<(), crate::pus::EcssTmtcError> {
466 let send_addr = |store_addr: PoolAddr| {
467 self.sender
468 .send_packet(sender_id, store_addr)
469 .map_err(EcssTmtcError::Send)
470 };
471 match tm {
472 crate::pus::PusTmVariant::InStore(store_addr) => send_addr(store_addr),
473 crate::pus::PusTmVariant::Direct(tm_creator) => {
474 let mut pool = self.shared_pool.borrow_mut();
475 let store_addr = pool.add_pus_tm_from_creator(&tm_creator)?;
476 send_addr(store_addr)
477 }
478 }
479 }
480 }
481}
482
483#[cfg(test)]
484pub(crate) mod tests {
485 use alloc::vec;
486
487 use std::sync::RwLock;
488
489 use crate::pool::{PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig};
490
491 use super::*;
492 use std::sync::mpsc;
493
494 pub(crate) fn send_with_sender<SendError>(
495 sender_id: ComponentId,
496 packet_sender: &(impl PacketSenderRaw<Error = SendError> + ?Sized),
497 packet: &[u8],
498 ) -> Result<(), SendError> {
499 packet_sender.send_packet(sender_id, packet)
500 }
501
502 #[test]
503 fn test_basic_mpsc_channel_sender_bounded() {
504 let (tx, rx) = mpsc::channel();
505 let some_packet = vec![1, 2, 3, 4, 5];
506 send_with_sender(1, &tx, &some_packet).expect("failed to send packet");
507 let rx_packet = rx.try_recv().unwrap();
508 assert_eq!(some_packet, rx_packet.packet);
509 assert_eq!(1, rx_packet.sender_id);
510 }
511
512 #[test]
513 fn test_basic_mpsc_channel_receiver_dropped() {
514 let (tx, rx) = mpsc::channel();
515 let some_packet = vec![1, 2, 3, 4, 5];
516 drop(rx);
517 let result = send_with_sender(2, &tx, &some_packet);
518 assert!(result.is_err());
519 matches!(result.unwrap_err(), GenericSendError::RxDisconnected);
520 }
521
522 #[test]
523 fn test_basic_mpsc_sync_sender() {
524 let (tx, rx) = mpsc::sync_channel(3);
525 let some_packet = vec![1, 2, 3, 4, 5];
526 send_with_sender(3, &tx, &some_packet).expect("failed to send packet");
527 let rx_packet = rx.try_recv().unwrap();
528 assert_eq!(some_packet, rx_packet.packet);
529 assert_eq!(3, rx_packet.sender_id);
530 }
531
532 #[test]
533 fn test_basic_mpsc_sync_sender_receiver_dropped() {
534 let (tx, rx) = mpsc::sync_channel(3);
535 let some_packet = vec![1, 2, 3, 4, 5];
536 drop(rx);
537 let result = send_with_sender(0, &tx, &some_packet);
538 assert!(result.is_err());
539 matches!(result.unwrap_err(), GenericSendError::RxDisconnected);
540 }
541
542 #[test]
543 fn test_basic_mpsc_sync_sender_queue_full() {
544 let (tx, rx) = mpsc::sync_channel(1);
545 let some_packet = vec![1, 2, 3, 4, 5];
546 send_with_sender(0, &tx, &some_packet).expect("failed to send packet");
547 let result = send_with_sender(1, &tx, &some_packet);
548 assert!(result.is_err());
549 matches!(result.unwrap_err(), GenericSendError::QueueFull(None));
550 let rx_packet = rx.try_recv().unwrap();
551 assert_eq!(some_packet, rx_packet.packet);
552 }
553
554 #[test]
555 fn test_basic_shared_store_sender_unbounded_sender() {
556 let (tc_tx, tc_rx) = mpsc::channel();
557 let pool_cfg = StaticPoolConfig::new(vec![(2, 8)], true);
558 let shared_pool = SharedPacketPool::new(&SharedStaticMemoryPool::new(RwLock::new(
559 StaticMemoryPool::new(pool_cfg),
560 )));
561 let some_packet = vec![1, 2, 3, 4, 5];
562 let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone());
563 send_with_sender(5, &tc_sender, &some_packet).expect("failed to send packet");
564 let packet_in_pool = tc_rx.try_recv().unwrap();
565 let mut pool = shared_pool.0.write().unwrap();
566 let read_guard = pool.read_with_guard(packet_in_pool.store_addr);
567 assert_eq!(read_guard.read_as_vec().unwrap(), some_packet);
568 assert_eq!(packet_in_pool.sender_id, 5)
569 }
570
571 #[test]
572 fn test_basic_shared_store_sender() {
573 let (tc_tx, tc_rx) = mpsc::sync_channel(10);
574 let pool_cfg = StaticPoolConfig::new(vec![(2, 8)], true);
575 let shared_pool = SharedPacketPool::new(&SharedStaticMemoryPool::new(RwLock::new(
576 StaticMemoryPool::new(pool_cfg),
577 )));
578 let some_packet = vec![1, 2, 3, 4, 5];
579 let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone());
580 send_with_sender(5, &tc_sender, &some_packet).expect("failed to send packet");
581 let packet_in_pool = tc_rx.try_recv().unwrap();
582 let mut pool = shared_pool.0.write().unwrap();
583 let read_guard = pool.read_with_guard(packet_in_pool.store_addr);
584 assert_eq!(read_guard.read_as_vec().unwrap(), some_packet);
585 assert_eq!(packet_in_pool.sender_id, 5)
586 }
587
588 #[test]
589 fn test_basic_shared_store_sender_rx_dropped() {
590 let (tc_tx, tc_rx) = mpsc::sync_channel(10);
591 let pool_cfg = StaticPoolConfig::new(vec![(2, 8)], true);
592 let shared_pool = SharedPacketPool::new(&SharedStaticMemoryPool::new(RwLock::new(
593 StaticMemoryPool::new(pool_cfg),
594 )));
595 let some_packet = vec![1, 2, 3, 4, 5];
596 drop(tc_rx);
597 let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone());
598 let result = send_with_sender(2, &tc_sender, &some_packet);
599 assert!(result.is_err());
600 matches!(
601 result.unwrap_err(),
602 StoreAndSendError::Send(GenericSendError::RxDisconnected)
603 );
604 }
605
606 #[test]
607 fn test_basic_shared_store_sender_queue_full() {
608 let (tc_tx, tc_rx) = mpsc::sync_channel(1);
609 let pool_cfg = StaticPoolConfig::new(vec![(2, 8)], true);
610 let shared_pool = SharedPacketPool::new(&SharedStaticMemoryPool::new(RwLock::new(
611 StaticMemoryPool::new(pool_cfg),
612 )));
613 let some_packet = vec![1, 2, 3, 4, 5];
614 let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone());
615 send_with_sender(3, &tc_sender, &some_packet).expect("failed to send packet");
616 let result = send_with_sender(3, &tc_sender, &some_packet);
617 assert!(result.is_err());
618 matches!(
619 result.unwrap_err(),
620 StoreAndSendError::Send(GenericSendError::RxDisconnected)
621 );
622 let packet_in_pool = tc_rx.try_recv().unwrap();
623 let mut pool = shared_pool.0.write().unwrap();
624 let read_guard = pool.read_with_guard(packet_in_pool.store_addr);
625 assert_eq!(read_guard.read_as_vec().unwrap(), some_packet);
626 assert_eq!(packet_in_pool.sender_id, 3);
627 }
628
629 #[test]
630 fn test_basic_shared_store_store_error() {
631 let (tc_tx, tc_rx) = mpsc::sync_channel(1);
632 let pool_cfg = StaticPoolConfig::new(vec![(1, 8)], true);
633 let shared_pool = SharedPacketPool::new(&SharedStaticMemoryPool::new(RwLock::new(
634 StaticMemoryPool::new(pool_cfg),
635 )));
636 let some_packet = vec![1, 2, 3, 4, 5];
637 let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone());
638 send_with_sender(4, &tc_sender, &some_packet).expect("failed to send packet");
639 let result = send_with_sender(4, &tc_sender, &some_packet);
640 assert!(result.is_err());
641 matches!(
642 result.unwrap_err(),
643 StoreAndSendError::Store(PoolError::StoreFull(..))
644 );
645 let packet_in_pool = tc_rx.try_recv().unwrap();
646 let mut pool = shared_pool.0.write().unwrap();
647 let read_guard = pool.read_with_guard(packet_in_pool.store_addr);
648 assert_eq!(read_guard.read_as_vec().unwrap(), some_packet);
649 assert_eq!(packet_in_pool.sender_id, 4);
650 }
651}