nodo_std/
serializer.rs

1// Copyright 2023 David Weikersdorfer
2
3use core::marker::PhantomData;
4use nodo::{
5    core::{BinaryFormat, SerializedMessage},
6    prelude::*,
7};
8
9/// A codelet which serializes a message
10pub struct Serializer<T, BF> {
11    format: BF,
12    marker: PhantomData<T>,
13}
14
15#[derive(Config)]
16pub struct SerializerConfig {
17    #[hidden]
18    /// Maximum number of messages which can be queued before messages are dropped.
19    pub queue_size: usize,
20}
21
22impl Default for SerializerConfig {
23    fn default() -> Self {
24        Self { queue_size: 10 }
25    }
26}
27
28impl<T, BF> Serializer<T, BF> {
29    pub fn new(format: BF) -> Self {
30        Self {
31            format,
32            marker: PhantomData::default(),
33        }
34    }
35}
36
37impl<T, BF> Codelet for Serializer<T, BF>
38where
39    T: Send + Sync,
40    BF: Send + BinaryFormat<T>,
41{
42    type Status = DefaultStatus;
43    type Config = SerializerConfig;
44    type Rx = DoubleBufferRx<Message<T>>;
45    type Tx = DoubleBufferTx<Message<Vec<u8>>>;
46    type Signals = ();
47
48    fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
49        (
50            DoubleBufferRx::new(
51                OverflowPolicy::Forget(cfg.queue_size),
52                RetentionPolicy::Keep,
53            ),
54            DoubleBufferTx::new(cfg.queue_size),
55        )
56    }
57
58    fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
59        if rx.is_empty() {
60            SKIPPED
61        } else {
62            while let Some(message) = rx.try_pop() {
63                tx.push(SerializedMessage {
64                    seq: message.seq,
65                    stamp: Stamp {
66                        acqtime: message.stamp.acqtime,
67                        pubtime: cx.clocks.app_mono.now(),
68                    },
69                    value: self.format.serialize(&message.value)?,
70                })?;
71            }
72            SUCCESS
73        }
74    }
75}