use core::marker::PhantomData;
use nodo::{channels::SyncResult, prelude::*};
#[derive(Config, Default)]
pub struct JoinConfig {
#[hidden]
pub input_count: usize,
}
pub struct Join<T>(PhantomData<T>);
impl<T: Send + Sync + Clone> Default for Join<T> {
fn default() -> Self {
Self(PhantomData)
}
}
impl<T: Send + Sync + Clone> Codelet for Join<T> {
type Status = DefaultStatus;
type Config = JoinConfig;
type Rx = JoinRx<T>;
type Tx = DoubleBufferTx<T>;
type Signals = ();
fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
(
JoinRx::new(cfg.input_count),
DoubleBufferTx::new_auto_size(),
)
}
fn step(&mut self, _: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
for channel in rx.inputs.iter_mut() {
tx.push_many(channel.drain(..))?;
}
SUCCESS
}
}
pub struct JoinRx<T> {
inputs: Vec<DoubleBufferRx<T>>,
names: Vec<String>,
}
impl<T> JoinRx<T> {
pub fn new(count: usize) -> Self {
Self {
inputs: (0..count)
.map(|_| DoubleBufferRx::new_auto_size())
.collect(),
names: (0..count).map(|i| format!("{i}")).collect(),
}
}
pub fn channel_mut(&mut self, index: usize) -> &mut DoubleBufferRx<T> {
&mut self.inputs[index]
}
pub fn new_channel_mut(&mut self) -> &mut DoubleBufferRx<T> {
self.inputs.push(DoubleBufferRx::new_auto_size());
self.names.push(format!("{}", self.names.len()));
self.inputs.last_mut().unwrap()
}
}
impl<T: Send + Sync> nodo::channels::RxBundle for JoinRx<T> {
fn channel_count(&self) -> usize {
self.inputs.len()
}
fn name(&self, index: usize) -> &str {
&self.names[index]
}
fn inbox_message_count(&self, index: usize) -> usize {
self.inputs[index].len()
}
fn sync_all(&mut self, results: &mut [SyncResult]) {
for (i, channel) in self.inputs.iter_mut().enumerate() {
results[i] = channel.sync()
}
}
fn check_connection(&self) -> nodo::channels::ConnectionCheck {
let mut cc = nodo::channels::ConnectionCheck::new(self.inputs.len());
for (i, channel) in self.inputs.iter().enumerate() {
cc.mark(i, channel.is_connected());
}
cc
}
}