nodo_std 0.18.5

Standard codelets for NODO
Documentation
// Copyright 2023 David Weikersdorfer

use core::marker::PhantomData;
use eyre::ensure;
use nodo::{channels::SyncResult, prelude::*};

/// A multiplexer has multiple input inputs and a single output channel. Messages received on
/// the selected input channel are send on the output channel and messages on other inputs are
/// discarded. The channel can be selected via a separate input channel.
pub struct Multiplexer<T, S, F> {
    selection: Option<usize>,
    map_f: F,
    pd: PhantomData<(T, S)>,
}

impl<T> Default for Multiplexer<T, T, fn(T) -> T> {
    fn default() -> Self {
        Self {
            selection: None,
            map_f: |x| x,
            pd: PhantomData::default(),
        }
    }
}

impl<T, S, F> Multiplexer<T, S, F> {
    pub fn new(map_f: F) -> Self {
        Self {
            selection: None,
            map_f,
            pd: PhantomData,
        }
    }
}

#[derive(Clone)]
pub struct MultiplexerSelection(Option<usize>);

impl From<usize> for MultiplexerSelection {
    fn from(other: usize) -> Self {
        Self(Some(other))
    }
}

impl From<Option<usize>> for MultiplexerSelection {
    fn from(other: Option<usize>) -> Self {
        Self(other)
    }
}

#[derive(Config, Default)]
pub struct MultiplexerConfig {
    #[hidden]
    pub initial_input_count: usize,

    #[hidden]
    pub initial_selection: Option<usize>,
}

pub struct MultiplexerRx<T> {
    inputs: Vec<MessageRx<T>>,
    names: Vec<String>,
    selection: MessageRx<MultiplexerSelection>,
}

impl<T> MultiplexerRx<T> {
    pub fn new(count: usize) -> Self {
        Self {
            inputs: (0..count)
                .map(|_| DoubleBufferRx::new_auto_size())
                .collect(),
            names: (0..count).map(|i| format!("{i}")).collect(),
            selection: DoubleBufferRx::new_latest(),
        }
    }

    /// Gets the selection channel
    pub fn selection_mut(&mut self) -> &mut MessageRx<MultiplexerSelection> {
        &mut self.selection
    }

    /// Get the i-th input channel
    pub fn channel_mut(&mut self, index: usize) -> &mut MessageRx<T> {
        &mut self.inputs[index]
    }

    /// Add a new input channel and return it
    pub fn new_channel_mut(&mut self) -> &mut MessageRx<T> {
        self.inputs.push(DoubleBufferRx::new_auto_size());
        self.names.push(format!("{}", self.names.len()));
        self.inputs.last_mut().unwrap()
    }
}

impl<T: Send + Sync> nodo::channels::RxBundle for MultiplexerRx<T> {
    fn channel_count(&self) -> usize {
        self.inputs.len() + 1
    }

    fn name(&self, index: usize) -> &str {
        &self.names[index]
    }

    fn inbox_message_count(&self, index: usize) -> usize {
        self.inputs[index].len()
    }

    fn sync_all(&mut self, results: &mut [SyncResult]) {
        for (i, channel) in self.inputs.iter_mut().enumerate() {
            results[i] = channel.sync();
        }
        results[results.len() - 1] = self.selection.sync();
    }

    fn check_connection(&self) -> nodo::channels::ConnectionCheck {
        let mut cc = nodo::channels::ConnectionCheck::new(self.inputs.len() + 1);
        for (i, channel) in self.inputs.iter().enumerate() {
            cc.mark(i, channel.is_connected());
        }
        cc.mark(self.inputs.len(), self.selection.is_connected());
        cc
    }
}

impl<T, S, F> Codelet for Multiplexer<T, S, F>
where
    T: Send + Sync,
    S: Send + Sync + Clone,
    F: Fn(T) -> S + Send,
{
    type Status = DefaultStatus;
    type Config = MultiplexerConfig;
    type Rx = MultiplexerRx<T>;
    type Tx = MessageTx<S>;
    type Signals = ();

    fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
        (
            MultiplexerRx::new(cfg.initial_input_count),
            MessageTx::new_auto_size(),
        )
    }

    fn start(&mut self, cx: Context<Self>, rx: &mut Self::Rx, _tx: &mut Self::Tx) -> Outcome {
        self.update_selection(cx.config.initial_selection, rx.inputs.len())?;
        SUCCESS
    }

    fn step(&mut self, _cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
        // React to channel selection
        if let Some(Message {
            value: MultiplexerSelection(selection),
            ..
        }) = rx.selection.try_pop()
        {
            self.update_selection(selection, rx.inputs.len())?;
        }

        // First forward messages from selected input
        if let Some(selection) = self.selection {
            tx.push_many(
                rx.inputs[selection]
                    .drain(..)
                    .map(|msg| (msg.stamp.acqtime, (self.map_f)(msg.value))),
            )?;
        }

        // Then discard all messages from other inputs
        for (i, channel) in rx.inputs.iter_mut().enumerate() {
            if Some(i) == self.selection {
                continue;
            }

            channel.drain(..);
        }

        SUCCESS
    }
}

impl<T, S, F> Multiplexer<T, S, F> {
    fn update_selection(&mut self, selection: Option<usize>, channel_count: usize) -> Outcome {
        if let Some(selection) = selection {
            ensure!(
                selection < channel_count,
                "invalid input channel {selection}. channel count: {}",
                channel_count
            );
            self.selection = Some(selection);
        } else {
            self.selection = None;
        }
        SUCCESS
    }
}