nodo_std 0.18.5

Standard codelets for NODO
Documentation
// Copyright 2023 David Weikersdorfer

use core::marker::PhantomData;
use nodo::{
    core::{BinaryFormat, SerializedMessage},
    prelude::*,
};

/// A codelet which serializes a message
pub struct Serializer<T, BF> {
    format: BF,
    marker: PhantomData<T>,
}

#[derive(Config)]
pub struct SerializerConfig {
    #[hidden]
    /// Maximum number of messages which can be queued before messages are dropped.
    pub queue_size: usize,
}

impl Default for SerializerConfig {
    fn default() -> Self {
        Self { queue_size: 10 }
    }
}

impl<T, BF> Serializer<T, BF> {
    pub fn new(format: BF) -> Self {
        Self {
            format,
            marker: PhantomData::default(),
        }
    }
}

impl<T, BF> Codelet for Serializer<T, BF>
where
    T: Send + Sync,
    BF: Send + BinaryFormat<T>,
{
    type Status = DefaultStatus;
    type Config = SerializerConfig;
    type Rx = DoubleBufferRx<Message<T>>;
    type Tx = DoubleBufferTx<Message<Vec<u8>>>;
    type Signals = ();

    fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
        (
            DoubleBufferRx::new(
                OverflowPolicy::Forget(cfg.queue_size),
                RetentionPolicy::Keep,
            ),
            DoubleBufferTx::new(cfg.queue_size),
        )
    }

    fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
        if rx.is_empty() {
            SKIPPED
        } else {
            while let Some(message) = rx.try_pop() {
                tx.push(SerializedMessage {
                    seq: message.seq,
                    stamp: Stamp {
                        acqtime: message.stamp.acqtime,
                        pubtime: cx.clocks.app_mono.now(),
                    },
                    value: self.format.serialize(&message.value)?,
                })?;
            }
            SUCCESS
        }
    }
}