use core::marker::PhantomData;
use nodo::prelude::*;
pub struct Pipe<T, S, F> {
callback: F,
marker: PhantomData<(T, S)>,
}
#[derive(Config)]
pub enum PipeConfig {
OneToOne,
Dynamic,
}
impl<T, S, F> Pipe<T, S, F> {
pub fn new(callback: F) -> Self {
Self {
callback,
marker: PhantomData,
}
}
}
impl<T, S, F> Codelet for Pipe<T, S, F>
where
T: Send + Sync,
S: Clone + Send + Sync,
F: FnMut(T) -> S + Send,
{
type Status = DefaultStatus;
type Config = PipeConfig;
type Rx = DoubleBufferRx<T>;
type Tx = DoubleBufferTx<S>;
type Signals = ();
fn build_bundles(config: &Self::Config) -> (Self::Rx, Self::Tx) {
match config {
PipeConfig::OneToOne => (
DoubleBufferRx::new(OverflowPolicy::Reject(1), RetentionPolicy::EnforceEmpty),
DoubleBufferTx::new(1),
),
PipeConfig::Dynamic => (
DoubleBufferRx::new_auto_size(),
DoubleBufferTx::new_auto_size(),
),
}
}
fn start(&mut self, _cx: Context<Self>, rx: &mut Self::Rx, _tx: &mut Self::Tx) -> Outcome {
while let Some(_) = rx.try_pop() {}
SUCCESS
}
fn step(&mut self, ctx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
match ctx.config {
PipeConfig::OneToOne => {
if let Some(msg) = rx.try_pop() {
tx.push((self.callback)(msg))?;
SUCCESS
} else {
SKIPPED
}
}
PipeConfig::Dynamic => {
let skipped = rx.is_empty();
while let Some(msg) = rx.try_pop() {
tx.push((self.callback)(msg))?;
}
if skipped {
SKIPPED
} else {
SUCCESS
}
}
}
}
}