1use core::marker::PhantomData;
4use nodo::{core::BinaryFormat, prelude::*};
5
6pub struct Deserializer<T, BF> {
8 format: BF,
9 marker: PhantomData<T>,
10}
11
12#[derive(Config)]
13pub struct DeserializerConfig {
14 #[hidden]
15 pub queue_size: usize,
17}
18
19impl Default for DeserializerConfig {
20 fn default() -> Self {
21 Self { queue_size: 10 }
22 }
23}
24
25impl<T, BF> Deserializer<T, BF> {
26 pub fn new(format: BF) -> Self {
27 Self {
28 format,
29 marker: PhantomData::default(),
30 }
31 }
32}
33
34impl<T, BF> Codelet for Deserializer<T, BF>
35where
36 T: Send + Sync + Clone,
37 BF: Send + BinaryFormat<T>,
38{
39 type Status = DefaultStatus;
40 type Config = DeserializerConfig;
41 type Rx = DoubleBufferRx<Message<Vec<u8>>>;
42 type Tx = DoubleBufferTx<Message<T>>;
43 type Signals = ();
44
45 fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
46 (
47 DoubleBufferRx::new(
48 OverflowPolicy::Forget(cfg.queue_size),
49 RetentionPolicy::Keep,
50 ),
51 DoubleBufferTx::new(cfg.queue_size),
52 )
53 }
54
55 fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
56 if rx.is_empty() {
57 SKIPPED
58 } else {
59 while let Some(message) = rx.try_pop() {
60 tx.push(Message {
61 seq: message.seq,
62 stamp: Stamp {
63 acqtime: message.stamp.acqtime,
64 pubtime: cx.clocks.app_mono.now(),
65 },
66 value: self.format.deserialize(&message.value)?,
67 })?;
68 }
69 SUCCESS
70 }
71 }
72}