Skip to main content

nodo_std/
deserializer.rs

1// Copyright 2023 David Weikersdorfer
2
3use core::marker::PhantomData;
4use nodo::{core::BinaryFormat, prelude::*};
5
6/// A codelet which serializes a message
7pub struct Deserializer<T, BF> {
8    format: BF,
9    marker: PhantomData<T>,
10}
11
12#[derive(Config)]
13pub struct DeserializerConfig {
14    #[hidden]
15    /// Maximum number of messages which can be queued before messages are dropped.
16    pub queue_size: usize,
17}
18
19impl Default for DeserializerConfig {
20    fn default() -> Self {
21        Self { queue_size: 10 }
22    }
23}
24
25impl<T, BF> Deserializer<T, BF> {
26    pub fn new(format: BF) -> Self {
27        Self {
28            format,
29            marker: PhantomData::default(),
30        }
31    }
32}
33
34impl<T, BF> Codelet for Deserializer<T, BF>
35where
36    T: Send + Sync + Clone,
37    BF: Send + BinaryFormat<T>,
38{
39    type Status = DefaultStatus;
40    type Config = DeserializerConfig;
41    type Rx = DoubleBufferRx<Message<Vec<u8>>>;
42    type Tx = DoubleBufferTx<Message<T>>;
43    type Signals = ();
44
45    fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
46        (
47            DoubleBufferRx::new(
48                OverflowPolicy::Forget(cfg.queue_size),
49                RetentionPolicy::Keep,
50            ),
51            DoubleBufferTx::new(cfg.queue_size),
52        )
53    }
54
55    fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
56        if rx.is_empty() {
57            SKIPPED
58        } else {
59            while let Some(message) = rx.try_pop() {
60                tx.push(Message {
61                    seq: message.seq,
62                    stamp: Stamp {
63                        acqtime: message.stamp.acqtime,
64                        pubtime: cx.clocks.app_mono.now(),
65                    },
66                    value: self.format.deserialize(&message.value)?,
67                })?;
68            }
69            SUCCESS
70        }
71    }
72}