use core::marker::PhantomData;
use nodo::{
core::{BinaryFormat, SerializedMessage},
prelude::*,
};
pub struct Serializer<T, BF> {
format: BF,
marker: PhantomData<T>,
}
#[derive(Config)]
pub struct SerializerConfig {
#[hidden]
pub queue_size: usize,
}
impl Default for SerializerConfig {
fn default() -> Self {
Self { queue_size: 10 }
}
}
impl<T, BF> Serializer<T, BF> {
pub fn new(format: BF) -> Self {
Self {
format,
marker: PhantomData::default(),
}
}
}
impl<T, BF> Codelet for Serializer<T, BF>
where
T: Send + Sync,
BF: Send + BinaryFormat<T>,
{
type Status = DefaultStatus;
type Config = SerializerConfig;
type Rx = DoubleBufferRx<Message<T>>;
type Tx = DoubleBufferTx<Message<Vec<u8>>>;
type Signals = ();
fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
(
DoubleBufferRx::new(
OverflowPolicy::Forget(cfg.queue_size),
RetentionPolicy::Keep,
),
DoubleBufferTx::new(cfg.queue_size),
)
}
fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
if rx.is_empty() {
SKIPPED
} else {
while let Some(message) = rx.try_pop() {
tx.push(SerializedMessage {
seq: message.seq,
stamp: Stamp {
acqtime: message.stamp.acqtime,
pubtime: cx.clocks.app_mono.now(),
},
value: self.format.serialize(&message.value)?,
})?;
}
SUCCESS
}
}
}