1use core::marker::PhantomData;
4use nodo::{
5 core::{BinaryFormat, SerializedMessage},
6 prelude::*,
7};
8
9pub struct Serializer<T, BF> {
11 format: BF,
12 marker: PhantomData<T>,
13}
14
15#[derive(Config)]
16pub struct SerializerConfig {
17 #[hidden]
18 pub queue_size: usize,
20}
21
22impl Default for SerializerConfig {
23 fn default() -> Self {
24 Self { queue_size: 10 }
25 }
26}
27
28impl<T, BF> Serializer<T, BF> {
29 pub fn new(format: BF) -> Self {
30 Self {
31 format,
32 marker: PhantomData::default(),
33 }
34 }
35}
36
37impl<T, BF> Codelet for Serializer<T, BF>
38where
39 T: Send + Sync,
40 BF: Send + BinaryFormat<T>,
41{
42 type Status = DefaultStatus;
43 type Config = SerializerConfig;
44 type Rx = DoubleBufferRx<Message<T>>;
45 type Tx = DoubleBufferTx<Message<Vec<u8>>>;
46 type Signals = ();
47
48 fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
49 (
50 DoubleBufferRx::new(
51 OverflowPolicy::Forget(cfg.queue_size),
52 RetentionPolicy::Keep,
53 ),
54 DoubleBufferTx::new(cfg.queue_size),
55 )
56 }
57
58 fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
59 if rx.is_empty() {
60 SKIPPED
61 } else {
62 while let Some(message) = rx.try_pop() {
63 tx.push(SerializedMessage {
64 seq: message.seq,
65 stamp: Stamp {
66 acqtime: message.stamp.acqtime,
67 pubtime: cx.clocks.app_mono.now(),
68 },
69 value: self.format.serialize(&message.value)?,
70 })?;
71 }
72 SUCCESS
73 }
74 }
75}