use core::marker::PhantomData;
use eyre::ensure;
use nodo::{channels::SyncResult, prelude::*};
pub struct Multiplexer<T, S, F> {
selection: Option<usize>,
map_f: F,
pd: PhantomData<(T, S)>,
}
impl<T> Default for Multiplexer<T, T, fn(T) -> T> {
fn default() -> Self {
Self {
selection: None,
map_f: |x| x,
pd: PhantomData::default(),
}
}
}
impl<T, S, F> Multiplexer<T, S, F> {
pub fn new(map_f: F) -> Self {
Self {
selection: None,
map_f,
pd: PhantomData,
}
}
}
#[derive(Clone)]
pub struct MultiplexerSelection(Option<usize>);
impl From<usize> for MultiplexerSelection {
fn from(other: usize) -> Self {
Self(Some(other))
}
}
impl From<Option<usize>> for MultiplexerSelection {
fn from(other: Option<usize>) -> Self {
Self(other)
}
}
#[derive(Config, Default)]
pub struct MultiplexerConfig {
#[hidden]
pub initial_input_count: usize,
#[hidden]
pub initial_selection: Option<usize>,
}
pub struct MultiplexerRx<T> {
inputs: Vec<MessageRx<T>>,
names: Vec<String>,
selection: MessageRx<MultiplexerSelection>,
}
impl<T> MultiplexerRx<T> {
pub fn new(count: usize) -> Self {
Self {
inputs: (0..count)
.map(|_| DoubleBufferRx::new_auto_size())
.collect(),
names: (0..count).map(|i| format!("{i}")).collect(),
selection: DoubleBufferRx::new_latest(),
}
}
pub fn selection_mut(&mut self) -> &mut MessageRx<MultiplexerSelection> {
&mut self.selection
}
pub fn channel_mut(&mut self, index: usize) -> &mut MessageRx<T> {
&mut self.inputs[index]
}
pub fn new_channel_mut(&mut self) -> &mut MessageRx<T> {
self.inputs.push(DoubleBufferRx::new_auto_size());
self.names.push(format!("{}", self.names.len()));
self.inputs.last_mut().unwrap()
}
}
impl<T: Send + Sync> nodo::channels::RxBundle for MultiplexerRx<T> {
fn channel_count(&self) -> usize {
self.inputs.len() + 1
}
fn name(&self, index: usize) -> &str {
&self.names[index]
}
fn inbox_message_count(&self, index: usize) -> usize {
self.inputs[index].len()
}
fn sync_all(&mut self, results: &mut [SyncResult]) {
for (i, channel) in self.inputs.iter_mut().enumerate() {
results[i] = channel.sync();
}
results[results.len() - 1] = self.selection.sync();
}
fn check_connection(&self) -> nodo::channels::ConnectionCheck {
let mut cc = nodo::channels::ConnectionCheck::new(self.inputs.len() + 1);
for (i, channel) in self.inputs.iter().enumerate() {
cc.mark(i, channel.is_connected());
}
cc.mark(self.inputs.len(), self.selection.is_connected());
cc
}
}
impl<T, S, F> Codelet for Multiplexer<T, S, F>
where
T: Send + Sync,
S: Send + Sync + Clone,
F: Fn(T) -> S + Send,
{
type Status = DefaultStatus;
type Config = MultiplexerConfig;
type Rx = MultiplexerRx<T>;
type Tx = MessageTx<S>;
type Signals = ();
fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
(
MultiplexerRx::new(cfg.initial_input_count),
MessageTx::new_auto_size(),
)
}
fn start(&mut self, cx: Context<Self>, rx: &mut Self::Rx, _tx: &mut Self::Tx) -> Outcome {
self.update_selection(cx.config.initial_selection, rx.inputs.len())?;
SUCCESS
}
fn step(&mut self, _cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
if let Some(Message {
value: MultiplexerSelection(selection),
..
}) = rx.selection.try_pop()
{
self.update_selection(selection, rx.inputs.len())?;
}
if let Some(selection) = self.selection {
tx.push_many(
rx.inputs[selection]
.drain(..)
.map(|msg| (msg.stamp.acqtime, (self.map_f)(msg.value))),
)?;
}
for (i, channel) in rx.inputs.iter_mut().enumerate() {
if Some(i) == self.selection {
continue;
}
channel.drain(..);
}
SUCCESS
}
}
impl<T, S, F> Multiplexer<T, S, F> {
fn update_selection(&mut self, selection: Option<usize>, channel_count: usize) -> Outcome {
if let Some(selection) = selection {
ensure!(
selection < channel_count,
"invalid input channel {selection}. channel count: {}",
channel_count
);
self.selection = Some(selection);
} else {
self.selection = None;
}
SUCCESS
}
}